Nova patch for stable/pike sqlalchemy api
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. | |
# Copyright 2010 United States Government as represented by the | |
# Administrator of the National Aeronautics and Space Administration. | |
# All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you may | |
# not use this file except in compliance with the License. You may obtain | |
# a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
# License for the specific language governing permissions and limitations | |
# under the License. | |
"""Implementation of SQLAlchemy backend.""" | |
import collections | |
import copy | |
import datetime | |
import functools | |
import inspect | |
import sys | |
from oslo_db import api as oslo_db_api | |
from oslo_db import exception as db_exc | |
from oslo_db.sqlalchemy import enginefacade | |
from oslo_db.sqlalchemy import update_match | |
from oslo_db.sqlalchemy import utils as sqlalchemyutils | |
from oslo_log import log as logging | |
from oslo_utils import excutils | |
from oslo_utils import importutils | |
from oslo_utils import timeutils | |
from oslo_utils import uuidutils | |
import six | |
from six.moves import range | |
import sqlalchemy as sa | |
from sqlalchemy import and_ | |
from sqlalchemy.exc import NoSuchTableError | |
from sqlalchemy.ext.compiler import compiles | |
from sqlalchemy import MetaData | |
from sqlalchemy import or_ | |
from sqlalchemy.orm import aliased | |
from sqlalchemy.orm import contains_eager | |
from sqlalchemy.orm import joinedload | |
from sqlalchemy.orm import joinedload_all | |
from sqlalchemy.orm import noload | |
from sqlalchemy.orm import undefer | |
from sqlalchemy.schema import Table | |
from sqlalchemy import sql | |
from sqlalchemy.sql.expression import asc | |
from sqlalchemy.sql.expression import desc | |
from sqlalchemy.sql.expression import UpdateBase | |
from sqlalchemy.sql import false | |
from sqlalchemy.sql import func | |
from sqlalchemy.sql import null | |
from sqlalchemy.sql import true | |
from nova import block_device | |
from nova.compute import task_states | |
from nova.compute import vm_states | |
import nova.conf | |
import nova.context | |
from nova.db.sqlalchemy import models | |
from nova import exception | |
from nova.i18n import _ | |
from nova import safe_utils | |
profiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy') | |
CONF = nova.conf.CONF | |
LOG = logging.getLogger(__name__) | |
main_context_manager = enginefacade.transaction_context() | |
api_context_manager = enginefacade.transaction_context() | |
def _get_db_conf(conf_group, connection=None): | |
kw = dict( | |
connection=connection or conf_group.connection, | |
slave_connection=conf_group.slave_connection, | |
sqlite_fk=False, | |
__autocommit=True, | |
expire_on_commit=False, | |
mysql_sql_mode=conf_group.mysql_sql_mode, | |
idle_timeout=conf_group.idle_timeout, | |
connection_debug=conf_group.connection_debug, | |
max_pool_size=conf_group.max_pool_size, | |
max_overflow=conf_group.max_overflow, | |
pool_timeout=conf_group.pool_timeout, | |
sqlite_synchronous=conf_group.sqlite_synchronous, | |
connection_trace=conf_group.connection_trace, | |
max_retries=conf_group.max_retries, | |
retry_interval=conf_group.retry_interval) | |
return kw | |
def _context_manager_from_context(context): | |
if context: | |
try: | |
return context.db_connection | |
except AttributeError: | |
pass | |
def configure(conf): | |
main_context_manager.configure(**_get_db_conf(conf.database)) | |
api_context_manager.configure(**_get_db_conf(conf.api_database)) | |
if profiler_sqlalchemy and CONF.profiler.enabled \ | |
and CONF.profiler.trace_sqlalchemy: | |
main_context_manager.append_on_engine_create( | |
lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db")) | |
api_context_manager.append_on_engine_create( | |
lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db")) | |
def create_context_manager(connection=None): | |
"""Create a database context manager object. | |
: param connection: The database connection string | |
""" | |
ctxt_mgr = enginefacade.transaction_context() | |
ctxt_mgr.configure(**_get_db_conf(CONF.database, connection=connection)) | |
return ctxt_mgr | |
def get_context_manager(context): | |
"""Get a database context manager object. | |
:param context: The request context that can contain a context manager | |
""" | |
return _context_manager_from_context(context) or main_context_manager | |
def get_engine(use_slave=False, context=None): | |
"""Get a database engine object. | |
:param use_slave: Whether to use the slave connection | |
:param context: The request context that can contain a context manager | |
""" | |
ctxt_mgr = get_context_manager(context) | |
return ctxt_mgr.get_legacy_facade().get_engine(use_slave=use_slave) | |
def get_api_engine(): | |
return api_context_manager.get_legacy_facade().get_engine() | |
_SHADOW_TABLE_PREFIX = 'shadow_' | |
_DEFAULT_QUOTA_NAME = 'default' | |
PER_PROJECT_QUOTAS = ['fixed_ips', 'floating_ips', 'networks'] | |
def get_backend(): | |
"""The backend is this module itself.""" | |
return sys.modules[__name__] | |
def require_context(f): | |
"""Decorator to require *any* user or admin context. | |
This does no authorization for user or project access matching, see | |
:py:func:`nova.context.authorize_project_context` and | |
:py:func:`nova.context.authorize_user_context`. | |
The first argument to the wrapped function must be the context. | |
""" | |
@functools.wraps(f) | |
def wrapper(*args, **kwargs): | |
nova.context.require_context(args[0]) | |
return f(*args, **kwargs) | |
return wrapper | |
def require_instance_exists_using_uuid(f): | |
"""Decorator to require the specified instance to exist. | |
Requires the wrapped function to use context and instance_uuid as | |
their first two arguments. | |
""" | |
@functools.wraps(f) | |
def wrapper(context, instance_uuid, *args, **kwargs): | |
instance_get_by_uuid(context, instance_uuid) | |
return f(context, instance_uuid, *args, **kwargs) | |
return wrapper | |
def require_aggregate_exists(f): | |
"""Decorator to require the specified aggregate to exist. | |
Requires the wrapped function to use context and aggregate_id as | |
their first two arguments. | |
""" | |
@functools.wraps(f) | |
def wrapper(context, aggregate_id, *args, **kwargs): | |
aggregate_get(context, aggregate_id) | |
return f(context, aggregate_id, *args, **kwargs) | |
return wrapper | |
def select_db_reader_mode(f): | |
"""Decorator to select synchronous or asynchronous reader mode. | |
The kwarg argument 'use_slave' defines reader mode. Asynchronous reader | |
will be used if 'use_slave' is True and synchronous reader otherwise. | |
If 'use_slave' is not specified default value 'False' will be used. | |
Wrapped function must have a context in the arguments. | |
""" | |
@functools.wraps(f) | |
def wrapper(*args, **kwargs): | |
wrapped_func = safe_utils.get_wrapped_function(f) | |
keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs) | |
context = keyed_args['context'] | |
use_slave = keyed_args.get('use_slave', False) | |
if use_slave: | |
reader_mode = get_context_manager(context).async | |
else: | |
reader_mode = get_context_manager(context).reader | |
with reader_mode.using(context): | |
return f(*args, **kwargs) | |
return wrapper | |
def pick_context_manager_writer(f): | |
"""Decorator to use a writer db context manager. | |
The db context manager will be picked from the RequestContext. | |
Wrapped function must have a RequestContext in the arguments. | |
""" | |
@functools.wraps(f) | |
def wrapped(context, *args, **kwargs): | |
ctxt_mgr = get_context_manager(context) | |
with ctxt_mgr.writer.using(context): | |
return f(context, *args, **kwargs) | |
return wrapped | |
def pick_context_manager_reader(f): | |
"""Decorator to use a reader db context manager. | |
The db context manager will be picked from the RequestContext. | |
Wrapped function must have a RequestContext in the arguments. | |
""" | |
@functools.wraps(f) | |
def wrapped(context, *args, **kwargs): | |
ctxt_mgr = get_context_manager(context) | |
with ctxt_mgr.reader.using(context): | |
return f(context, *args, **kwargs) | |
return wrapped | |
def pick_context_manager_reader_allow_async(f): | |
"""Decorator to use a reader.allow_async db context manager. | |
The db context manager will be picked from the RequestContext. | |
Wrapped function must have a RequestContext in the arguments. | |
""" | |
@functools.wraps(f) | |
def wrapped(context, *args, **kwargs): | |
ctxt_mgr = get_context_manager(context) | |
with ctxt_mgr.reader.allow_async.using(context): | |
return f(context, *args, **kwargs) | |
return wrapped | |
def model_query(context, model, | |
args=None, | |
read_deleted=None, | |
project_only=False): | |
"""Query helper that accounts for context's `read_deleted` field. | |
:param context: NovaContext of the query. | |
:param model: Model to query. Must be a subclass of ModelBase. | |
:param args: Arguments to query. If None - model is used. | |
:param read_deleted: If not None, overrides context's read_deleted field. | |
Permitted values are 'no', which does not return | |
deleted values; 'only', which only returns deleted | |
values; and 'yes', which does not filter deleted | |
values. | |
:param project_only: If set and context is user-type, then restrict | |
query to match the context's project_id. If set to | |
'allow_none', restriction includes project_id = None. | |
""" | |
if read_deleted is None: | |
read_deleted = context.read_deleted | |
query_kwargs = {} | |
if 'no' == read_deleted: | |
query_kwargs['deleted'] = False | |
elif 'only' == read_deleted: | |
query_kwargs['deleted'] = True | |
elif 'yes' == read_deleted: | |
pass | |
else: | |
raise ValueError(_("Unrecognized read_deleted value '%s'") | |
% read_deleted) | |
query = sqlalchemyutils.model_query( | |
model, context.session, args, **query_kwargs) | |
# We can't use oslo.db model_query's project_id here, as it doesn't allow | |
# us to return both our projects and unowned projects. | |
if nova.context.is_user_context(context) and project_only: | |
if project_only == 'allow_none': | |
query = query.\ | |
filter(or_(model.project_id == context.project_id, | |
model.project_id == null())) | |
else: | |
query = query.filter_by(project_id=context.project_id) | |
return query | |
def convert_objects_related_datetimes(values, *datetime_keys): | |
if not datetime_keys: | |
datetime_keys = ('created_at', 'deleted_at', 'updated_at') | |
for key in datetime_keys: | |
if key in values and values[key]: | |
if isinstance(values[key], six.string_types): | |
try: | |
values[key] = timeutils.parse_strtime(values[key]) | |
except ValueError: | |
# Try alternate parsing since parse_strtime will fail | |
# with say converting '2015-05-28T19:59:38+00:00' | |
values[key] = timeutils.parse_isotime(values[key]) | |
# NOTE(danms): Strip UTC timezones from datetimes, since they're | |
# stored that way in the database | |
values[key] = values[key].replace(tzinfo=None) | |
return values | |
def _sync_instances(context, project_id, user_id): | |
return dict(zip(('instances', 'cores', 'ram'), | |
_instance_data_get_for_user(context, project_id, user_id))) | |
def _sync_floating_ips(context, project_id, user_id): | |
return dict(floating_ips=_floating_ip_count_by_project( | |
context, project_id)) | |
def _sync_fixed_ips(context, project_id, user_id): | |
return dict(fixed_ips=_fixed_ip_count_by_project(context, project_id)) | |
def _sync_security_groups(context, project_id, user_id): | |
return dict(security_groups=_security_group_count_by_project_and_user( | |
context, project_id, user_id)) | |
def _sync_server_groups(context, project_id, user_id): | |
return dict(server_groups=_instance_group_count_by_project_and_user( | |
context, project_id, user_id)) | |
QUOTA_SYNC_FUNCTIONS = { | |
'_sync_instances': _sync_instances, | |
'_sync_floating_ips': _sync_floating_ips, | |
'_sync_fixed_ips': _sync_fixed_ips, | |
'_sync_security_groups': _sync_security_groups, | |
'_sync_server_groups': _sync_server_groups, | |
} | |
################### | |
def constraint(**conditions): | |
return Constraint(conditions) | |
def equal_any(*values): | |
return EqualityCondition(values) | |
def not_equal(*values): | |
return InequalityCondition(values) | |
class Constraint(object): | |
def __init__(self, conditions): | |
self.conditions = conditions | |
def apply(self, model, query): | |
for key, condition in self.conditions.items(): | |
for clause in condition.clauses(getattr(model, key)): | |
query = query.filter(clause) | |
return query | |
class EqualityCondition(object): | |
def __init__(self, values): | |
self.values = values | |
def clauses(self, field): | |
# method signature requires us to return an iterable even if for OR | |
# operator this will actually be a single clause | |
return [or_(*[field == value for value in self.values])] | |
class InequalityCondition(object): | |
def __init__(self, values): | |
self.values = values | |
def clauses(self, field): | |
return [field != value for value in self.values] | |
class DeleteFromSelect(UpdateBase): | |
def __init__(self, table, select, column): | |
self.table = table | |
self.select = select | |
self.column = column | |
# NOTE(guochbo): some versions of MySQL doesn't yet support subquery with | |
# 'LIMIT & IN/ALL/ANY/SOME' We need work around this with nesting select . | |
@compiles(DeleteFromSelect) | |
def visit_delete_from_select(element, compiler, **kw): | |
return "DELETE FROM %s WHERE %s in (SELECT T1.%s FROM (%s) as T1)" % ( | |
compiler.process(element.table, asfrom=True), | |
compiler.process(element.column), | |
element.column.name, | |
compiler.process(element.select)) | |
################### | |
@pick_context_manager_writer | |
def service_destroy(context, service_id): | |
service = service_get(context, service_id) | |
model_query(context, models.Service).\ | |
filter_by(id=service_id).\ | |
soft_delete(synchronize_session=False) | |
# TODO(sbauza): Remove the service_id filter in a later release | |
# once we are sure that all compute nodes report the host field | |
model_query(context, models.ComputeNode).\ | |
filter(or_(models.ComputeNode.service_id == service_id, | |
models.ComputeNode.host == service['host'])).\ | |
soft_delete(synchronize_session=False) | |
@pick_context_manager_reader | |
def service_get(context, service_id): | |
query = model_query(context, models.Service).filter_by(id=service_id) | |
result = query.first() | |
if not result: | |
raise exception.ServiceNotFound(service_id=service_id) | |
return result | |
@pick_context_manager_reader | |
def service_get_by_uuid(context, service_uuid): | |
query = model_query(context, models.Service).filter_by(uuid=service_uuid) | |
result = query.first() | |
if not result: | |
raise exception.ServiceNotFound(service_id=service_uuid) | |
return result | |
@pick_context_manager_reader_allow_async | |
def service_get_minimum_version(context, binaries): | |
min_versions = context.session.query( | |
models.Service.binary, | |
func.min(models.Service.version)).\ | |
filter(models.Service.binary.in_(binaries)).\ | |
filter(models.Service.deleted == 0).\ | |
filter(models.Service.forced_down == false()).\ | |
group_by(models.Service.binary) | |
return dict(min_versions) | |
@pick_context_manager_reader | |
def service_get_all(context, disabled=None): | |
query = model_query(context, models.Service) | |
if disabled is not None: | |
query = query.filter_by(disabled=disabled) | |
return query.all() | |
@pick_context_manager_reader | |
def service_get_all_by_topic(context, topic): | |
return model_query(context, models.Service, read_deleted="no").\ | |
filter_by(disabled=False).\ | |
filter_by(topic=topic).\ | |
all() | |
@pick_context_manager_reader | |
def service_get_by_host_and_topic(context, host, topic): | |
return model_query(context, models.Service, read_deleted="no").\ | |
filter_by(disabled=False).\ | |
filter_by(host=host).\ | |
filter_by(topic=topic).\ | |
first() | |
@pick_context_manager_reader | |
def service_get_all_by_binary(context, binary, include_disabled=False): | |
query = model_query(context, models.Service, read_deleted="no").\ | |
filter_by(binary=binary) | |
if not include_disabled: | |
query = query.filter_by(disabled=False) | |
return query.all() | |
@pick_context_manager_reader | |
def service_get_all_computes_by_hv_type(context, hv_type, | |
include_disabled=False): | |
query = model_query(context, models.Service, read_deleted="no").\ | |
filter_by(binary='nova-compute') | |
if not include_disabled: | |
query = query.filter_by(disabled=False) | |
query = query.join(models.ComputeNode, | |
models.Service.host == models.ComputeNode.host).\ | |
filter(models.ComputeNode.hypervisor_type == hv_type).\ | |
distinct('host') | |
return query.all() | |
@pick_context_manager_reader | |
def service_get_by_host_and_binary(context, host, binary): | |
result = model_query(context, models.Service, read_deleted="no").\ | |
filter_by(host=host).\ | |
filter_by(binary=binary).\ | |
first() | |
if not result: | |
raise exception.HostBinaryNotFound(host=host, binary=binary) | |
return result | |
@pick_context_manager_reader | |
def service_get_all_by_host(context, host): | |
return model_query(context, models.Service, read_deleted="no").\ | |
filter_by(host=host).\ | |
all() | |
@pick_context_manager_reader_allow_async | |
def service_get_by_compute_host(context, host): | |
result = model_query(context, models.Service, read_deleted="no").\ | |
filter_by(host=host).\ | |
filter_by(binary='nova-compute').\ | |
first() | |
if not result: | |
raise exception.ComputeHostNotFound(host=host) | |
return result | |
@pick_context_manager_writer | |
def service_create(context, values): | |
service_ref = models.Service() | |
service_ref.update(values) | |
# We only auto-disable nova-compute services since those are the only | |
# ones that can be enabled using the os-services REST API and they are | |
# the only ones where being disabled means anything. It does | |
# not make sense to be able to disable non-compute services like | |
# nova-scheduler or nova-osapi_compute since that does nothing. | |
if not CONF.enable_new_services and values.get('binary') == 'nova-compute': | |
msg = _("New compute service disabled due to config option.") | |
service_ref.disabled = True | |
service_ref.disabled_reason = msg | |
try: | |
service_ref.save(context.session) | |
except db_exc.DBDuplicateEntry as e: | |
if 'binary' in e.columns: | |
raise exception.ServiceBinaryExists(host=values.get('host'), | |
binary=values.get('binary')) | |
raise exception.ServiceTopicExists(host=values.get('host'), | |
topic=values.get('topic')) | |
return service_ref | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) | |
@pick_context_manager_writer | |
def service_update(context, service_id, values): | |
service_ref = service_get(context, service_id) | |
# Only servicegroup.drivers.db.DbDriver._report_state() updates | |
# 'report_count', so if that value changes then store the timestamp | |
# as the last time we got a state report. | |
if 'report_count' in values: | |
if values['report_count'] > service_ref.report_count: | |
service_ref.last_seen_up = timeutils.utcnow() | |
service_ref.update(values) | |
return service_ref | |
################### | |
def _compute_node_select(context, filters=None, limit=None, marker=None): | |
if filters is None: | |
filters = {} | |
cn_tbl = sa.alias(models.ComputeNode.__table__, name='cn') | |
select = sa.select([cn_tbl]) | |
if context.read_deleted == "no": | |
select = select.where(cn_tbl.c.deleted == 0) | |
if "compute_id" in filters: | |
select = select.where(cn_tbl.c.id == filters["compute_id"]) | |
if "service_id" in filters: | |
select = select.where(cn_tbl.c.service_id == filters["service_id"]) | |
if "host" in filters: | |
select = select.where(cn_tbl.c.host == filters["host"]) | |
if "hypervisor_hostname" in filters: | |
hyp_hostname = filters["hypervisor_hostname"] | |
select = select.where(cn_tbl.c.hypervisor_hostname == hyp_hostname) | |
if "mapped" in filters: | |
select = select.where(cn_tbl.c.mapped < filters['mapped']) | |
if marker is not None: | |
try: | |
compute_node_get(context, marker) | |
except exception.ComputeHostNotFound: | |
raise exception.MarkerNotFound(marker=marker) | |
select = select.where(cn_tbl.c.id > marker) | |
if limit is not None: | |
select = select.limit(limit) | |
# Explicitly order by id, so we're not dependent on the native sort | |
# order of the underlying DB. | |
select = select.order_by(asc("id")) | |
return select | |
def _compute_node_fetchall(context, filters=None, limit=None, marker=None): | |
select = _compute_node_select(context, filters, limit=limit, marker=marker) | |
engine = get_engine(context=context) | |
conn = engine.connect() | |
results = conn.execute(select).fetchall() | |
# Callers expect dict-like objects, not SQLAlchemy RowProxy objects... | |
results = [dict(r) for r in results] | |
conn.close() | |
return results | |
@pick_context_manager_reader | |
def compute_node_get(context, compute_id): | |
results = _compute_node_fetchall(context, {"compute_id": compute_id}) | |
if not results: | |
raise exception.ComputeHostNotFound(host=compute_id) | |
return results[0] | |
@pick_context_manager_reader | |
def compute_node_get_model(context, compute_id): | |
# TODO(edleafe): remove once the compute node resource provider migration | |
# is complete, and this distinction is no longer necessary. | |
result = model_query(context, models.ComputeNode).\ | |
filter_by(id=compute_id).\ | |
first() | |
if not result: | |
raise exception.ComputeHostNotFound(host=compute_id) | |
return result | |
@pick_context_manager_reader | |
def compute_nodes_get_by_service_id(context, service_id): | |
results = _compute_node_fetchall(context, {"service_id": service_id}) | |
if not results: | |
raise exception.ServiceNotFound(service_id=service_id) | |
return results | |
@pick_context_manager_reader | |
def compute_node_get_by_host_and_nodename(context, host, nodename): | |
results = _compute_node_fetchall(context, | |
{"host": host, "hypervisor_hostname": nodename}) | |
if not results: | |
raise exception.ComputeHostNotFound(host=host) | |
return results[0] | |
@pick_context_manager_reader_allow_async | |
def compute_node_get_all_by_host(context, host): | |
results = _compute_node_fetchall(context, {"host": host}) | |
if not results: | |
raise exception.ComputeHostNotFound(host=host) | |
return results | |
@pick_context_manager_reader | |
def compute_node_get_all(context): | |
return _compute_node_fetchall(context) | |
@pick_context_manager_reader | |
def compute_node_get_all_mapped_less_than(context, mapped_less_than): | |
return _compute_node_fetchall(context, | |
{'mapped': mapped_less_than}) | |
@pick_context_manager_reader | |
def compute_node_get_all_by_pagination(context, limit=None, marker=None): | |
return _compute_node_fetchall(context, limit=limit, marker=marker) | |
@pick_context_manager_reader | |
def compute_node_search_by_hypervisor(context, hypervisor_match): | |
field = models.ComputeNode.hypervisor_hostname | |
return model_query(context, models.ComputeNode).\ | |
filter(field.like('%%%s%%' % hypervisor_match)).\ | |
all() | |
@pick_context_manager_writer | |
def compute_node_create(context, values): | |
"""Creates a new ComputeNode and populates the capacity fields | |
with the most recent data. | |
""" | |
convert_objects_related_datetimes(values) | |
compute_node_ref = models.ComputeNode() | |
compute_node_ref.update(values) | |
compute_node_ref.save(context.session) | |
return compute_node_ref | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) | |
@pick_context_manager_writer | |
def compute_node_update(context, compute_id, values): | |
"""Updates the ComputeNode record with the most recent data.""" | |
compute_ref = compute_node_get_model(context, compute_id) | |
# Always update this, even if there's going to be no other | |
# changes in data. This ensures that we invalidate the | |
# scheduler cache of compute node data in case of races. | |
values['updated_at'] = timeutils.utcnow() | |
convert_objects_related_datetimes(values) | |
compute_ref.update(values) | |
return compute_ref | |
@pick_context_manager_writer | |
def compute_node_delete(context, compute_id): | |
"""Delete a ComputeNode record.""" | |
result = model_query(context, models.ComputeNode).\ | |
filter_by(id=compute_id).\ | |
soft_delete(synchronize_session=False) | |
if not result: | |
raise exception.ComputeHostNotFound(host=compute_id) | |
@pick_context_manager_reader | |
def compute_node_statistics(context): | |
"""Compute statistics over all compute nodes.""" | |
engine = get_engine(context=context) | |
services_tbl = models.Service.__table__ | |
inner_sel = sa.alias(_compute_node_select(context), name='inner_sel') | |
# TODO(sbauza): Remove the service_id filter in a later release | |
# once we are sure that all compute nodes report the host field | |
j = sa.join( | |
inner_sel, services_tbl, | |
sql.and_( | |
sql.or_( | |
inner_sel.c.host == services_tbl.c.host, | |
inner_sel.c.service_id == services_tbl.c.id | |
), | |
services_tbl.c.disabled == false(), | |
services_tbl.c.binary == 'nova-compute', | |
services_tbl.c.deleted == 0 | |
) | |
) | |
# NOTE(jaypipes): This COALESCE() stuff is temporary while the data | |
# migration to the new resource providers inventories and allocations | |
# tables is completed. | |
agg_cols = [ | |
func.count().label('count'), | |
sql.func.sum( | |
inner_sel.c.vcpus | |
).label('vcpus'), | |
sql.func.sum( | |
inner_sel.c.memory_mb | |
).label('memory_mb'), | |
sql.func.sum( | |
inner_sel.c.local_gb | |
).label('local_gb'), | |
sql.func.sum( | |
inner_sel.c.vcpus_used | |
).label('vcpus_used'), | |
sql.func.sum( | |
inner_sel.c.memory_mb_used | |
).label('memory_mb_used'), | |
sql.func.sum( | |
inner_sel.c.local_gb_used | |
).label('local_gb_used'), | |
sql.func.sum( | |
inner_sel.c.free_ram_mb | |
).label('free_ram_mb'), | |
sql.func.sum( | |
inner_sel.c.free_disk_gb | |
).label('free_disk_gb'), | |
sql.func.sum( | |
inner_sel.c.current_workload | |
).label('current_workload'), | |
sql.func.sum( | |
inner_sel.c.running_vms | |
).label('running_vms'), | |
sql.func.sum( | |
inner_sel.c.disk_available_least | |
).label('disk_available_least'), | |
] | |
select = sql.select(agg_cols).select_from(j) | |
conn = engine.connect() | |
results = conn.execute(select).fetchone() | |
# Build a dict of the info--making no assumptions about result | |
fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used', | |
'memory_mb_used', 'local_gb_used', 'free_ram_mb', 'free_disk_gb', | |
'current_workload', 'running_vms', 'disk_available_least') | |
results = {field: int(results[idx] or 0) | |
for idx, field in enumerate(fields)} | |
conn.close() | |
return results | |
################### | |
@pick_context_manager_writer | |
def certificate_create(context, values): | |
certificate_ref = models.Certificate() | |
for (key, value) in values.items(): | |
certificate_ref[key] = value | |
certificate_ref.save(context.session) | |
return certificate_ref | |
@pick_context_manager_reader | |
def certificate_get_all_by_project(context, project_id): | |
return model_query(context, models.Certificate, read_deleted="no").\ | |
filter_by(project_id=project_id).\ | |
all() | |
@pick_context_manager_reader | |
def certificate_get_all_by_user(context, user_id): | |
return model_query(context, models.Certificate, read_deleted="no").\ | |
filter_by(user_id=user_id).\ | |
all() | |
@pick_context_manager_reader | |
def certificate_get_all_by_user_and_project(context, user_id, project_id): | |
return model_query(context, models.Certificate, read_deleted="no").\ | |
filter_by(user_id=user_id).\ | |
filter_by(project_id=project_id).\ | |
all() | |
################### | |
@require_context | |
@pick_context_manager_reader | |
def floating_ip_get(context, id): | |
try: | |
result = model_query(context, models.FloatingIp, project_only=True).\ | |
filter_by(id=id).\ | |
options(joinedload_all('fixed_ip.instance')).\ | |
first() | |
if not result: | |
raise exception.FloatingIpNotFound(id=id) | |
except db_exc.DBError: | |
LOG.warning("Invalid floating IP ID %s in request", id) | |
raise exception.InvalidID(id=id) | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def floating_ip_get_pools(context): | |
pools = [] | |
for result in model_query(context, models.FloatingIp, | |
(models.FloatingIp.pool,)).distinct(): | |
pools.append({'name': result[0]}) | |
return pools | |
@require_context | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) # retry_on_request=True) | |
@pick_context_manager_writer | |
def floating_ip_allocate_address(context, project_id, pool, | |
auto_assigned=False): | |
nova.context.authorize_project_context(context, project_id) | |
floating_ip_ref = model_query(context, models.FloatingIp, | |
read_deleted="no").\ | |
filter_by(fixed_ip_id=None).\ | |
filter_by(project_id=None).\ | |
filter_by(pool=pool).\ | |
first() | |
if not floating_ip_ref: | |
raise exception.NoMoreFloatingIps() | |
params = {'project_id': project_id, 'auto_assigned': auto_assigned} | |
rows_update = model_query(context, models.FloatingIp, read_deleted="no").\ | |
filter_by(id=floating_ip_ref['id']).\ | |
filter_by(fixed_ip_id=None).\ | |
filter_by(project_id=None).\ | |
filter_by(pool=pool).\ | |
update(params, synchronize_session='evaluate') | |
if not rows_update: | |
LOG.debug('The row was updated in a concurrent transaction, ' | |
'we will fetch another one') | |
raise db_exc.RetryRequest(exception.FloatingIpAllocateFailed()) | |
return floating_ip_ref['address'] | |
@require_context | |
@pick_context_manager_writer | |
def floating_ip_bulk_create(context, ips, want_result=True): | |
try: | |
tab = models.FloatingIp().__table__ | |
context.session.execute(tab.insert(), ips) | |
except db_exc.DBDuplicateEntry as e: | |
raise exception.FloatingIpExists(address=e.value) | |
if want_result: | |
return model_query(context, models.FloatingIp).filter( | |
models.FloatingIp.address.in_( | |
[ip['address'] for ip in ips])).all() | |
def _ip_range_splitter(ips, block_size=256): | |
"""Yields blocks of IPs no more than block_size elements long.""" | |
out = [] | |
count = 0 | |
for ip in ips: | |
out.append(ip['address']) | |
count += 1 | |
if count > block_size - 1: | |
yield out | |
out = [] | |
count = 0 | |
if out: | |
yield out | |
@require_context | |
@pick_context_manager_writer | |
def floating_ip_bulk_destroy(context, ips): | |
project_id_to_quota_count = collections.defaultdict(int) | |
for ip_block in _ip_range_splitter(ips): | |
# Find any floating IPs that were not auto_assigned and | |
# thus need quota released. | |
query = model_query(context, models.FloatingIp).\ | |
filter(models.FloatingIp.address.in_(ip_block)).\ | |
filter_by(auto_assigned=False) | |
for row in query.all(): | |
# The count is negative since we release quota by | |
# reserving negative quota. | |
project_id_to_quota_count[row['project_id']] -= 1 | |
# Delete the floating IPs. | |
model_query(context, models.FloatingIp).\ | |
filter(models.FloatingIp.address.in_(ip_block)).\ | |
soft_delete(synchronize_session='fetch') | |
@require_context | |
@pick_context_manager_writer | |
def floating_ip_create(context, values): | |
floating_ip_ref = models.FloatingIp() | |
floating_ip_ref.update(values) | |
try: | |
floating_ip_ref.save(context.session) | |
except db_exc.DBDuplicateEntry: | |
raise exception.FloatingIpExists(address=values['address']) | |
return floating_ip_ref | |
def _floating_ip_count_by_project(context, project_id): | |
nova.context.authorize_project_context(context, project_id) | |
# TODO(tr3buchet): why leave auto_assigned floating IPs out? | |
return model_query(context, models.FloatingIp, read_deleted="no").\ | |
filter_by(project_id=project_id).\ | |
filter_by(auto_assigned=False).\ | |
count() | |
@require_context | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) | |
@pick_context_manager_writer | |
def floating_ip_fixed_ip_associate(context, floating_address, | |
fixed_address, host): | |
fixed_ip_ref = model_query(context, models.FixedIp).\ | |
filter_by(address=fixed_address).\ | |
options(joinedload('network')).\ | |
first() | |
if not fixed_ip_ref: | |
raise exception.FixedIpNotFoundForAddress(address=fixed_address) | |
rows = model_query(context, models.FloatingIp).\ | |
filter_by(address=floating_address).\ | |
filter(models.FloatingIp.project_id == | |
context.project_id).\ | |
filter(or_(models.FloatingIp.fixed_ip_id == | |
fixed_ip_ref['id'], | |
models.FloatingIp.fixed_ip_id.is_(None))).\ | |
update({'fixed_ip_id': fixed_ip_ref['id'], 'host': host}) | |
if not rows: | |
raise exception.FloatingIpAssociateFailed(address=floating_address) | |
return fixed_ip_ref | |
@require_context | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) | |
@pick_context_manager_writer | |
def floating_ip_deallocate(context, address): | |
return model_query(context, models.FloatingIp).\ | |
filter_by(address=address).\ | |
filter(and_(models.FloatingIp.project_id != null()), | |
models.FloatingIp.fixed_ip_id == null()).\ | |
update({'project_id': None, | |
'host': None, | |
'auto_assigned': False}, | |
synchronize_session=False) | |
@require_context | |
@pick_context_manager_writer | |
def floating_ip_destroy(context, address): | |
model_query(context, models.FloatingIp).\ | |
filter_by(address=address).\ | |
delete() | |
@require_context | |
@pick_context_manager_writer | |
def floating_ip_disassociate(context, address): | |
floating_ip_ref = model_query(context, | |
models.FloatingIp).\ | |
filter_by(address=address).\ | |
first() | |
if not floating_ip_ref: | |
raise exception.FloatingIpNotFoundForAddress(address=address) | |
fixed_ip_ref = model_query(context, models.FixedIp).\ | |
filter_by(id=floating_ip_ref['fixed_ip_id']).\ | |
options(joinedload('network')).\ | |
first() | |
floating_ip_ref.fixed_ip_id = None | |
floating_ip_ref.host = None | |
return fixed_ip_ref | |
def _floating_ip_get_all(context): | |
return model_query(context, models.FloatingIp, read_deleted="no") | |
@pick_context_manager_reader | |
def floating_ip_get_all(context): | |
floating_ip_refs = _floating_ip_get_all(context).\ | |
options(joinedload('fixed_ip')).\ | |
all() | |
if not floating_ip_refs: | |
raise exception.NoFloatingIpsDefined() | |
return floating_ip_refs | |
@pick_context_manager_reader | |
def floating_ip_get_all_by_host(context, host): | |
floating_ip_refs = _floating_ip_get_all(context).\ | |
filter_by(host=host).\ | |
options(joinedload('fixed_ip')).\ | |
all() | |
if not floating_ip_refs: | |
raise exception.FloatingIpNotFoundForHost(host=host) | |
return floating_ip_refs | |
@require_context | |
@pick_context_manager_reader | |
def floating_ip_get_all_by_project(context, project_id): | |
nova.context.authorize_project_context(context, project_id) | |
# TODO(tr3buchet): why do we not want auto_assigned floating IPs here? | |
return _floating_ip_get_all(context).\ | |
filter_by(project_id=project_id).\ | |
filter_by(auto_assigned=False).\ | |
options(joinedload_all('fixed_ip.instance')).\ | |
all() | |
@require_context | |
@pick_context_manager_reader | |
def floating_ip_get_by_address(context, address): | |
return _floating_ip_get_by_address(context, address) | |
def _floating_ip_get_by_address(context, address): | |
# if address string is empty explicitly set it to None | |
if not address: | |
address = None | |
try: | |
result = model_query(context, models.FloatingIp).\ | |
filter_by(address=address).\ | |
options(joinedload_all('fixed_ip.instance')).\ | |
first() | |
if not result: | |
raise exception.FloatingIpNotFoundForAddress(address=address) | |
except db_exc.DBError: | |
msg = _("Invalid floating IP %s in request") % address | |
LOG.warning(msg) | |
raise exception.InvalidIpAddressError(msg) | |
# If the floating IP has a project ID set, check to make sure | |
# the non-admin user has access. | |
if result.project_id and nova.context.is_user_context(context): | |
nova.context.authorize_project_context(context, result.project_id) | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def floating_ip_get_by_fixed_address(context, fixed_address): | |
return model_query(context, models.FloatingIp).\ | |
outerjoin(models.FixedIp, | |
models.FixedIp.id == | |
models.FloatingIp.fixed_ip_id).\ | |
filter(models.FixedIp.address == fixed_address).\ | |
all() | |
@require_context | |
@pick_context_manager_reader | |
def floating_ip_get_by_fixed_ip_id(context, fixed_ip_id): | |
return model_query(context, models.FloatingIp).\ | |
filter_by(fixed_ip_id=fixed_ip_id).\ | |
all() | |
@require_context | |
@pick_context_manager_writer | |
def floating_ip_update(context, address, values): | |
float_ip_ref = _floating_ip_get_by_address(context, address) | |
float_ip_ref.update(values) | |
try: | |
float_ip_ref.save(context.session) | |
except db_exc.DBDuplicateEntry: | |
raise exception.FloatingIpExists(address=values['address']) | |
return float_ip_ref | |
################### | |
@require_context | |
@pick_context_manager_reader | |
def dnsdomain_get(context, fqdomain): | |
return model_query(context, models.DNSDomain, read_deleted="no").\ | |
filter_by(domain=fqdomain).\ | |
with_lockmode('update').\ | |
first() | |
def _dnsdomain_get_or_create(context, fqdomain): | |
domain_ref = dnsdomain_get(context, fqdomain) | |
if not domain_ref: | |
dns_ref = models.DNSDomain() | |
dns_ref.update({'domain': fqdomain, | |
'availability_zone': None, | |
'project_id': None}) | |
return dns_ref | |
return domain_ref | |
@pick_context_manager_writer | |
def dnsdomain_register_for_zone(context, fqdomain, zone): | |
domain_ref = _dnsdomain_get_or_create(context, fqdomain) | |
domain_ref.scope = 'private' | |
domain_ref.availability_zone = zone | |
context.session.add(domain_ref) | |
@pick_context_manager_writer | |
def dnsdomain_register_for_project(context, fqdomain, project): | |
domain_ref = _dnsdomain_get_or_create(context, fqdomain) | |
domain_ref.scope = 'public' | |
domain_ref.project_id = project | |
context.session.add(domain_ref) | |
@pick_context_manager_writer | |
def dnsdomain_unregister(context, fqdomain): | |
model_query(context, models.DNSDomain).\ | |
filter_by(domain=fqdomain).\ | |
delete() | |
@pick_context_manager_reader | |
def dnsdomain_get_all(context): | |
return model_query(context, models.DNSDomain, read_deleted="no").all() | |
################### | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) # retry_on_request=True) | |
@pick_context_manager_writer | |
def fixed_ip_associate(context, address, instance_uuid, network_id=None, | |
reserved=False, virtual_interface_id=None): | |
"""Keyword arguments: | |
reserved -- should be a boolean value(True or False), exact value will be | |
used to filter on the fixed IP address | |
""" | |
if not uuidutils.is_uuid_like(instance_uuid): | |
raise exception.InvalidUUID(uuid=instance_uuid) | |
network_or_none = or_(models.FixedIp.network_id == network_id, | |
models.FixedIp.network_id == null()) | |
fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter(network_or_none).\ | |
filter_by(reserved=reserved).\ | |
filter_by(address=address).\ | |
first() | |
if fixed_ip_ref is None: | |
raise exception.FixedIpNotFoundForNetwork(address=address, | |
network_uuid=network_id) | |
if fixed_ip_ref.instance_uuid: | |
raise exception.FixedIpAlreadyInUse(address=address, | |
instance_uuid=instance_uuid) | |
params = {'instance_uuid': instance_uuid, | |
'allocated': virtual_interface_id is not None} | |
if not fixed_ip_ref.network_id: | |
params['network_id'] = network_id | |
if virtual_interface_id: | |
params['virtual_interface_id'] = virtual_interface_id | |
rows_updated = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(id=fixed_ip_ref.id).\ | |
filter(network_or_none).\ | |
filter_by(reserved=reserved).\ | |
filter_by(address=address).\ | |
update(params, synchronize_session='evaluate') | |
if not rows_updated: | |
LOG.debug('The row was updated in a concurrent transaction, ' | |
'we will fetch another row') | |
raise db_exc.RetryRequest( | |
exception.FixedIpAssociateFailed(net=network_id)) | |
return fixed_ip_ref | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) # retry_on_request=True) | |
@pick_context_manager_writer | |
def fixed_ip_associate_pool(context, network_id, instance_uuid=None, | |
host=None, virtual_interface_id=None): | |
"""allocate a fixed ip out of a fixed ip network pool. | |
This allocates an unallocated fixed ip out of a specified | |
network. We sort by updated_at to hand out the oldest address in | |
the list. | |
""" | |
if instance_uuid and not uuidutils.is_uuid_like(instance_uuid): | |
raise exception.InvalidUUID(uuid=instance_uuid) | |
network_or_none = or_(models.FixedIp.network_id == network_id, | |
models.FixedIp.network_id == null()) | |
fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter(network_or_none).\ | |
filter_by(reserved=False).\ | |
filter_by(instance_uuid=None).\ | |
filter_by(host=None).\ | |
filter_by(leased=False).\ | |
order_by(asc(models.FixedIp.updated_at)).\ | |
first() | |
if not fixed_ip_ref: | |
raise exception.NoMoreFixedIps(net=network_id) | |
params = {'allocated': virtual_interface_id is not None} | |
if fixed_ip_ref['network_id'] is None: | |
params['network_id'] = network_id | |
if instance_uuid: | |
params['instance_uuid'] = instance_uuid | |
if host: | |
params['host'] = host | |
if virtual_interface_id: | |
params['virtual_interface_id'] = virtual_interface_id | |
rows_updated = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(id=fixed_ip_ref['id']).\ | |
filter_by(network_id=fixed_ip_ref['network_id']).\ | |
filter_by(reserved=False).\ | |
filter_by(instance_uuid=None).\ | |
filter_by(host=None).\ | |
filter_by(leased=False).\ | |
filter_by(address=fixed_ip_ref['address']).\ | |
update(params, synchronize_session='evaluate') | |
if not rows_updated: | |
LOG.debug('The row was updated in a concurrent transaction, ' | |
'we will fetch another row') | |
raise db_exc.RetryRequest( | |
exception.FixedIpAssociateFailed(net=network_id)) | |
return fixed_ip_ref | |
@require_context | |
@pick_context_manager_writer | |
def fixed_ip_create(context, values): | |
fixed_ip_ref = models.FixedIp() | |
fixed_ip_ref.update(values) | |
try: | |
fixed_ip_ref.save(context.session) | |
except db_exc.DBDuplicateEntry: | |
raise exception.FixedIpExists(address=values['address']) | |
return fixed_ip_ref | |
@require_context | |
@pick_context_manager_writer | |
def fixed_ip_bulk_create(context, ips): | |
try: | |
tab = models.FixedIp.__table__ | |
context.session.execute(tab.insert(), ips) | |
except db_exc.DBDuplicateEntry as e: | |
raise exception.FixedIpExists(address=e.value) | |
@require_context | |
@pick_context_manager_writer | |
def fixed_ip_disassociate(context, address): | |
_fixed_ip_get_by_address(context, address).update( | |
{'instance_uuid': None, | |
'virtual_interface_id': None}) | |
@pick_context_manager_writer | |
def fixed_ip_disassociate_all_by_timeout(context, host, time): | |
# NOTE(vish): only update fixed ips that "belong" to this | |
# host; i.e. the network host or the instance | |
# host matches. Two queries necessary because | |
# join with update doesn't work. | |
host_filter = or_(and_(models.Instance.host == host, | |
models.Network.multi_host == true()), | |
models.Network.host == host) | |
result = model_query(context, models.FixedIp, (models.FixedIp.id,), | |
read_deleted="no").\ | |
filter(models.FixedIp.allocated == false()).\ | |
filter(models.FixedIp.updated_at < time).\ | |
join((models.Network, | |
models.Network.id == models.FixedIp.network_id)).\ | |
join((models.Instance, | |
models.Instance.uuid == models.FixedIp.instance_uuid)).\ | |
filter(host_filter).\ | |
all() | |
fixed_ip_ids = [fip[0] for fip in result] | |
if not fixed_ip_ids: | |
return 0 | |
result = model_query(context, models.FixedIp).\ | |
filter(models.FixedIp.id.in_(fixed_ip_ids)).\ | |
update({'instance_uuid': None, | |
'leased': False, | |
'updated_at': timeutils.utcnow()}, | |
synchronize_session='fetch') | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def fixed_ip_get(context, id, get_network=False): | |
query = model_query(context, models.FixedIp).filter_by(id=id) | |
if get_network: | |
query = query.options(joinedload('network')) | |
result = query.first() | |
if not result: | |
raise exception.FixedIpNotFound(id=id) | |
# FIXME(sirp): shouldn't we just use project_only here to restrict the | |
# results? | |
if (nova.context.is_user_context(context) and | |
result['instance_uuid'] is not None): | |
instance = instance_get_by_uuid(context.elevated(read_deleted='yes'), | |
result['instance_uuid']) | |
nova.context.authorize_project_context(context, instance.project_id) | |
return result | |
@pick_context_manager_reader | |
def fixed_ip_get_all(context): | |
result = model_query(context, models.FixedIp, read_deleted="yes").all() | |
if not result: | |
raise exception.NoFixedIpsDefined() | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def fixed_ip_get_by_address(context, address, columns_to_join=None): | |
return _fixed_ip_get_by_address(context, address, | |
columns_to_join=columns_to_join) | |
def _fixed_ip_get_by_address(context, address, columns_to_join=None): | |
if columns_to_join is None: | |
columns_to_join = [] | |
try: | |
result = model_query(context, models.FixedIp) | |
for column in columns_to_join: | |
result = result.options(joinedload_all(column)) | |
result = result.filter_by(address=address).first() | |
if not result: | |
raise exception.FixedIpNotFoundForAddress(address=address) | |
except db_exc.DBError: | |
msg = _("Invalid fixed IP Address %s in request") % address | |
LOG.warning(msg) | |
raise exception.FixedIpInvalid(msg) | |
# NOTE(sirp): shouldn't we just use project_only here to restrict the | |
# results? | |
if (nova.context.is_user_context(context) and | |
result['instance_uuid'] is not None): | |
instance = _instance_get_by_uuid( | |
context.elevated(read_deleted='yes'), | |
result['instance_uuid']) | |
nova.context.authorize_project_context(context, | |
instance.project_id) | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def fixed_ip_get_by_floating_address(context, floating_address): | |
return model_query(context, models.FixedIp).\ | |
join(models.FloatingIp, | |
models.FloatingIp.fixed_ip_id == | |
models.FixedIp.id).\ | |
filter(models.FloatingIp.address == floating_address).\ | |
first() | |
# NOTE(tr3buchet) please don't invent an exception here, None is fine | |
@require_context | |
@pick_context_manager_reader | |
def fixed_ip_get_by_instance(context, instance_uuid): | |
if not uuidutils.is_uuid_like(instance_uuid): | |
raise exception.InvalidUUID(uuid=instance_uuid) | |
vif_and = and_(models.VirtualInterface.id == | |
models.FixedIp.virtual_interface_id, | |
models.VirtualInterface.deleted == 0) | |
result = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(instance_uuid=instance_uuid).\ | |
outerjoin(models.VirtualInterface, vif_and).\ | |
options(contains_eager("virtual_interface")).\ | |
options(joinedload('network')).\ | |
options(joinedload('floating_ips')).\ | |
order_by(asc(models.VirtualInterface.created_at), | |
asc(models.VirtualInterface.id)).\ | |
all() | |
if not result: | |
raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid) | |
return result | |
@pick_context_manager_reader | |
def fixed_ip_get_by_host(context, host): | |
instance_uuids = _instance_get_all_uuids_by_host(context, host) | |
if not instance_uuids: | |
return [] | |
return model_query(context, models.FixedIp).\ | |
filter(models.FixedIp.instance_uuid.in_(instance_uuids)).\ | |
all() | |
@require_context | |
@pick_context_manager_reader | |
def fixed_ip_get_by_network_host(context, network_id, host): | |
result = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(network_id=network_id).\ | |
filter_by(host=host).\ | |
first() | |
if not result: | |
raise exception.FixedIpNotFoundForNetworkHost(network_id=network_id, | |
host=host) | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def fixed_ips_by_virtual_interface(context, vif_id): | |
result = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(virtual_interface_id=vif_id).\ | |
options(joinedload('network')).\ | |
options(joinedload('floating_ips')).\ | |
all() | |
return result | |
@require_context | |
@pick_context_manager_writer | |
def fixed_ip_update(context, address, values): | |
_fixed_ip_get_by_address(context, address).update(values) | |
def _fixed_ip_count_by_project(context, project_id): | |
nova.context.authorize_project_context(context, project_id) | |
return model_query(context, models.FixedIp, (models.FixedIp.id,), | |
read_deleted="no").\ | |
join((models.Instance, | |
models.Instance.uuid == models.FixedIp.instance_uuid)).\ | |
filter(models.Instance.project_id == project_id).\ | |
count() | |
################### | |
@require_context | |
@pick_context_manager_writer | |
def virtual_interface_create(context, values): | |
"""Create a new virtual interface record in the database. | |
:param values: = dict containing column values | |
""" | |
try: | |
vif_ref = models.VirtualInterface() | |
vif_ref.update(values) | |
vif_ref.save(context.session) | |
except db_exc.DBError: | |
LOG.exception("VIF creation failed with a database error.") | |
raise exception.VirtualInterfaceCreateException() | |
return vif_ref | |
def _virtual_interface_query(context): | |
return model_query(context, models.VirtualInterface, read_deleted="no") | |
@require_context | |
@pick_context_manager_writer | |
def virtual_interface_update(context, address, values): | |
vif_ref = virtual_interface_get_by_address(context, address) | |
vif_ref.update(values) | |
vif_ref.save(context.session) | |
return vif_ref | |
@require_context | |
@pick_context_manager_reader | |
def virtual_interface_get(context, vif_id): | |
"""Gets a virtual interface from the table. | |
:param vif_id: = id of the virtual interface | |
""" | |
vif_ref = _virtual_interface_query(context).\ | |
filter_by(id=vif_id).\ | |
first() | |
return vif_ref | |
@require_context | |
@pick_context_manager_reader | |
def virtual_interface_get_by_address(context, address): | |
"""Gets a virtual interface from the table. | |
:param address: = the address of the interface you're looking to get | |
""" | |
try: | |
vif_ref = _virtual_interface_query(context).\ | |
filter_by(address=address).\ | |
first() | |
except db_exc.DBError: | |
msg = _("Invalid virtual interface address %s in request") % address | |
LOG.warning(msg) | |
raise exception.InvalidIpAddressError(msg) | |
return vif_ref | |
@require_context | |
@pick_context_manager_reader | |
def virtual_interface_get_by_uuid(context, vif_uuid): | |
"""Gets a virtual interface from the table. | |
:param vif_uuid: the uuid of the interface you're looking to get | |
""" | |
vif_ref = _virtual_interface_query(context).\ | |
filter_by(uuid=vif_uuid).\ | |
first() | |
return vif_ref | |
@require_context | |
@require_instance_exists_using_uuid | |
@pick_context_manager_reader_allow_async | |
def virtual_interface_get_by_instance(context, instance_uuid): | |
"""Gets all virtual interfaces for instance. | |
:param instance_uuid: = uuid of the instance to retrieve vifs for | |
""" | |
vif_refs = _virtual_interface_query(context).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
order_by(asc("created_at"), asc("id")).\ | |
all() | |
return vif_refs | |
@require_context | |
@pick_context_manager_reader | |
def virtual_interface_get_by_instance_and_network(context, instance_uuid, | |
network_id): | |
"""Gets virtual interface for instance that's associated with network.""" | |
vif_ref = _virtual_interface_query(context).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
filter_by(network_id=network_id).\ | |
first() | |
return vif_ref | |
@require_context | |
@pick_context_manager_writer | |
def virtual_interface_delete_by_instance(context, instance_uuid): | |
"""Delete virtual interface records that are associated | |
with the instance given by instance_id. | |
:param instance_uuid: = uuid of instance | |
""" | |
_virtual_interface_query(context).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
@require_context | |
@pick_context_manager_writer | |
def virtual_interface_delete(context, id): | |
"""Delete virtual interface records. | |
:param id: id of the interface | |
""" | |
_virtual_interface_query(context).\ | |
filter_by(id=id).\ | |
soft_delete() | |
@require_context | |
@pick_context_manager_reader | |
def virtual_interface_get_all(context): | |
"""Get all vifs.""" | |
vif_refs = _virtual_interface_query(context).all() | |
return vif_refs | |
################### | |
def _metadata_refs(metadata_dict, meta_class): | |
metadata_refs = [] | |
if metadata_dict: | |
for k, v in metadata_dict.items(): | |
metadata_ref = meta_class() | |
metadata_ref['key'] = k | |
metadata_ref['value'] = v | |
metadata_refs.append(metadata_ref) | |
return metadata_refs | |
def _validate_unique_server_name(context, name): | |
if not CONF.osapi_compute_unique_server_name_scope: | |
return | |
lowername = name.lower() | |
base_query = model_query(context, models.Instance, read_deleted='no').\ | |
filter(func.lower(models.Instance.hostname) == lowername) | |
if CONF.osapi_compute_unique_server_name_scope == 'project': | |
instance_with_same_name = base_query.\ | |
filter_by(project_id=context.project_id).\ | |
count() | |
elif CONF.osapi_compute_unique_server_name_scope == 'global': | |
instance_with_same_name = base_query.count() | |
else: | |
return | |
if instance_with_same_name > 0: | |
raise exception.InstanceExists(name=lowername) | |
def _handle_objects_related_type_conversions(values): | |
"""Make sure that certain things in values (which may have come from | |
an objects.instance.Instance object) are in suitable form for the | |
database. | |
""" | |
# NOTE(danms): Make sure IP addresses are passed as strings to | |
# the database engine | |
for key in ('access_ip_v4', 'access_ip_v6'): | |
if key in values and values[key] is not None: | |
values[key] = str(values[key]) | |
datetime_keys = ('created_at', 'deleted_at', 'updated_at', | |
'launched_at', 'terminated_at') | |
convert_objects_related_datetimes(values, *datetime_keys) | |
def _check_instance_exists_in_project(context, instance_uuid): | |
if not model_query(context, models.Instance, read_deleted="no", | |
project_only=True).filter_by( | |
uuid=instance_uuid).first(): | |
raise exception.InstanceNotFound(instance_id=instance_uuid) | |
@require_context | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) | |
@pick_context_manager_writer | |
def instance_create(context, values): | |
"""Create a new Instance record in the database. | |
context - request context object | |
values - dict containing column values. | |
""" | |
security_group_ensure_default(context) | |
values = values.copy() | |
values['metadata'] = _metadata_refs( | |
values.get('metadata'), models.InstanceMetadata) | |
values['system_metadata'] = _metadata_refs( | |
values.get('system_metadata'), models.InstanceSystemMetadata) | |
_handle_objects_related_type_conversions(values) | |
instance_ref = models.Instance() | |
if not values.get('uuid'): | |
values['uuid'] = uuidutils.generate_uuid() | |
instance_ref['info_cache'] = models.InstanceInfoCache() | |
info_cache = values.pop('info_cache', None) | |
if info_cache is not None: | |
instance_ref['info_cache'].update(info_cache) | |
security_groups = values.pop('security_groups', []) | |
instance_ref['extra'] = models.InstanceExtra() | |
instance_ref['extra'].update( | |
{'numa_topology': None, | |
'pci_requests': None, | |
'vcpu_model': None, | |
}) | |
instance_ref['extra'].update(values.pop('extra', {})) | |
instance_ref.update(values) | |
def _get_sec_group_models(security_groups): | |
models = [] | |
default_group = _security_group_ensure_default(context) | |
if 'default' in security_groups: | |
models.append(default_group) | |
# Generate a new list, so we don't modify the original | |
security_groups = [x for x in security_groups if x != 'default'] | |
if security_groups: | |
models.extend(_security_group_get_by_names( | |
context, security_groups)) | |
return models | |
if 'hostname' in values: | |
_validate_unique_server_name(context, values['hostname']) | |
instance_ref.security_groups = _get_sec_group_models(security_groups) | |
context.session.add(instance_ref) | |
# create the instance uuid to ec2_id mapping entry for instance | |
ec2_instance_create(context, instance_ref['uuid']) | |
return instance_ref | |
def _instance_data_get_for_user(context, project_id, user_id): | |
result = model_query(context, models.Instance, ( | |
func.count(models.Instance.id), | |
func.sum(models.Instance.vcpus), | |
func.sum(models.Instance.memory_mb))).\ | |
filter_by(project_id=project_id) | |
if user_id: | |
result = result.filter_by(user_id=user_id).first() | |
else: | |
result = result.first() | |
# NOTE(vish): convert None to 0 | |
return (result[0] or 0, result[1] or 0, result[2] or 0) | |
@require_context | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) | |
@pick_context_manager_writer | |
def instance_destroy(context, instance_uuid, constraint=None): | |
if uuidutils.is_uuid_like(instance_uuid): | |
instance_ref = _instance_get_by_uuid(context, instance_uuid) | |
else: | |
raise exception.InvalidUUID(instance_uuid) | |
query = model_query(context, models.Instance).\ | |
filter_by(uuid=instance_uuid) | |
if constraint is not None: | |
query = constraint.apply(models.Instance, query) | |
count = query.soft_delete() | |
if count == 0: | |
raise exception.ConstraintNotMet() | |
model_query(context, models.SecurityGroupInstanceAssociation).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.InstanceInfoCache).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.InstanceMetadata).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.InstanceFault).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.InstanceExtra).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.InstanceSystemMetadata).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.InstanceGroupMember).\ | |
filter_by(instance_id=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.BlockDeviceMapping).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
model_query(context, models.Migration).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
# NOTE(snikitin): We can't use model_query here, because there is no | |
# column 'deleted' in 'tags' table. | |
context.session.query(models.Tag).filter_by( | |
resource_id=instance_uuid).delete() | |
context.session.query(models.ConsoleAuthToken).filter_by( | |
instance_uuid=instance_uuid).delete() | |
# NOTE(cfriesen): We intentionally do not soft-delete entries in the | |
# instance_actions or instance_actions_events tables because they | |
# can be used by operators to find out what actions were performed on a | |
# deleted instance. Both of these tables are special-cased in | |
# _archive_deleted_rows_for_table(). | |
return instance_ref | |
@require_context | |
@pick_context_manager_reader_allow_async | |
def instance_get_by_uuid(context, uuid, columns_to_join=None): | |
return _instance_get_by_uuid(context, uuid, | |
columns_to_join=columns_to_join) | |
def _instance_get_by_uuid(context, uuid, columns_to_join=None): | |
result = _build_instance_get(context, columns_to_join=columns_to_join).\ | |
filter_by(uuid=uuid).\ | |
first() | |
if not result: | |
raise exception.InstanceNotFound(instance_id=uuid) | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def instance_get(context, instance_id, columns_to_join=None): | |
try: | |
result = _build_instance_get(context, columns_to_join=columns_to_join | |
).filter_by(id=instance_id).first() | |
if not result: | |
raise exception.InstanceNotFound(instance_id=instance_id) | |
return result | |
except db_exc.DBError: | |
# NOTE(sdague): catch all in case the db engine chokes on the | |
# id because it's too long of an int to store. | |
LOG.warning("Invalid instance id %s in request", instance_id) | |
raise exception.InvalidID(id=instance_id) | |
def _build_instance_get(context, columns_to_join=None): | |
query = model_query(context, models.Instance, project_only=True).\ | |
options(joinedload_all('security_groups.rules')).\ | |
options(joinedload('info_cache')) | |
if columns_to_join is None: | |
columns_to_join = ['metadata', 'system_metadata'] | |
for column in columns_to_join: | |
if column in ['info_cache', 'security_groups']: | |
# Already always joined above | |
continue | |
if 'extra.' in column: | |
query = query.options(undefer(column)) | |
else: | |
query = query.options(joinedload(column)) | |
# NOTE(alaski) Stop lazy loading of columns not needed. | |
for col in ['metadata', 'system_metadata']: | |
if col not in columns_to_join: | |
query = query.options(noload(col)) | |
return query | |
def _instances_fill_metadata(context, instances, manual_joins=None): | |
"""Selectively fill instances with manually-joined metadata. Note that | |
instance will be converted to a dict. | |
:param context: security context | |
:param instances: list of instances to fill | |
:param manual_joins: list of tables to manually join (can be any | |
combination of 'metadata' and 'system_metadata' or | |
None to take the default of both) | |
""" | |
uuids = [inst['uuid'] for inst in instances] | |
if manual_joins is None: | |
manual_joins = ['metadata', 'system_metadata'] | |
meta = collections.defaultdict(list) | |
if 'metadata' in manual_joins: | |
for row in _instance_metadata_get_multi(context, uuids): | |
meta[row['instance_uuid']].append(row) | |
sys_meta = collections.defaultdict(list) | |
if 'system_metadata' in manual_joins: | |
for row in _instance_system_metadata_get_multi(context, uuids): | |
sys_meta[row['instance_uuid']].append(row) | |
pcidevs = collections.defaultdict(list) | |
if 'pci_devices' in manual_joins: | |
for row in _instance_pcidevs_get_multi(context, uuids): | |
pcidevs[row['instance_uuid']].append(row) | |
filled_instances = [] | |
for inst in instances: | |
inst = dict(inst) | |
inst['system_metadata'] = sys_meta[inst['uuid']] | |
inst['metadata'] = meta[inst['uuid']] | |
if 'pci_devices' in manual_joins: | |
inst['pci_devices'] = pcidevs[inst['uuid']] | |
filled_instances.append(inst) | |
return filled_instances | |
def _manual_join_columns(columns_to_join): | |
"""Separate manually joined columns from columns_to_join | |
If columns_to_join contains 'metadata', 'system_metadata', or | |
'pci_devices' those columns are removed from columns_to_join and added | |
to a manual_joins list to be used with the _instances_fill_metadata method. | |
The columns_to_join formal parameter is copied and not modified, the return | |
tuple has the modified columns_to_join list to be used with joinedload in | |
a model query. | |
:param:columns_to_join: List of columns to join in a model query. | |
:return: tuple of (manual_joins, columns_to_join) | |
""" | |
manual_joins = [] | |
columns_to_join_new = copy.copy(columns_to_join) | |
for column in ('metadata', 'system_metadata', 'pci_devices'): | |
if column in columns_to_join_new: | |
columns_to_join_new.remove(column) | |
manual_joins.append(column) | |
return manual_joins, columns_to_join_new | |
@require_context | |
@pick_context_manager_reader | |
def instance_get_all(context, columns_to_join=None): | |
if columns_to_join is None: | |
columns_to_join_new = ['info_cache', 'security_groups'] | |
manual_joins = ['metadata', 'system_metadata'] | |
else: | |
manual_joins, columns_to_join_new = ( | |
_manual_join_columns(columns_to_join)) | |
query = model_query(context, models.Instance) | |
for column in columns_to_join_new: | |
query = query.options(joinedload(column)) | |
if not context.is_admin: | |
# If we're not admin context, add appropriate filter.. | |
if context.project_id: | |
query = query.filter_by(project_id=context.project_id) | |
else: | |
query = query.filter_by(user_id=context.user_id) | |
instances = query.all() | |
return _instances_fill_metadata(context, instances, manual_joins) | |
@require_context | |
@pick_context_manager_reader_allow_async | |
def instance_get_all_by_filters(context, filters, sort_key, sort_dir, | |
limit=None, marker=None, columns_to_join=None): | |
"""Return instances matching all filters sorted by the primary key. | |
See instance_get_all_by_filters_sort for more information. | |
""" | |
# Invoke the API with the multiple sort keys and directions using the | |
# single sort key/direction | |
return instance_get_all_by_filters_sort(context, filters, limit=limit, | |
marker=marker, | |
columns_to_join=columns_to_join, | |
sort_keys=[sort_key], | |
sort_dirs=[sort_dir]) | |
@require_context | |
@pick_context_manager_reader_allow_async | |
def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None, | |
columns_to_join=None, sort_keys=None, | |
sort_dirs=None): | |
"""Return instances that match all filters sorted by the given keys. | |
Deleted instances will be returned by default, unless there's a filter that | |
says otherwise. | |
Depending on the name of a filter, matching for that filter is | |
performed using either exact matching or as regular expression | |
matching. Exact matching is applied for the following filters:: | |
| ['project_id', 'user_id', 'image_ref', | |
| 'vm_state', 'instance_type_id', 'uuid', | |
| 'metadata', 'host', 'system_metadata'] | |
A third type of filter (also using exact matching), filters | |
based on instance metadata tags when supplied under a special | |
key named 'filter':: | |
| filters = { | |
| 'filter': [ | |
| {'name': 'tag-key', 'value': '<metakey>'}, | |
| {'name': 'tag-value', 'value': '<metaval>'}, | |
| {'name': 'tag:<metakey>', 'value': '<metaval>'} | |
| ] | |
| } | |
Special keys are used to tweek the query further:: | |
| 'changes-since' - only return instances updated after | |
| 'deleted' - only return (or exclude) deleted instances | |
| 'soft_deleted' - modify behavior of 'deleted' to either | |
| include or exclude instances whose | |
| vm_state is SOFT_DELETED. | |
A fourth type of filter (also using exact matching), filters | |
based on instance tags (not metadata tags). There are two types | |
of these tags: | |
`tags` -- One or more strings that will be used to filter results | |
in an AND expression: T1 AND T2 | |
`tags-any` -- One or more strings that will be used to filter results in | |
an OR expression: T1 OR T2 | |
`not-tags` -- One or more strings that will be used to filter results in | |
an NOT AND expression: NOT (T1 AND T2) | |
`not-tags-any` -- One or more strings that will be used to filter results | |
in an NOT OR expression: NOT (T1 OR T2) | |
Tags should be represented as list:: | |
| filters = { | |
| 'tags': [some-tag, some-another-tag], | |
| 'tags-any: [some-any-tag, some-another-any-tag], | |
| 'not-tags: [some-not-tag, some-another-not-tag], | |
| 'not-tags-any: [some-not-any-tag, some-another-not-any-tag] | |
| } | |
""" | |
# NOTE(mriedem): If the limit is 0 there is no point in even going | |
# to the database since nothing is going to be returned anyway. | |
if limit == 0: | |
return [] | |
sort_keys, sort_dirs = process_sort_params(sort_keys, | |
sort_dirs, | |
default_dir='desc') | |
if columns_to_join is None: | |
columns_to_join_new = ['info_cache', 'security_groups'] | |
manual_joins = ['metadata', 'system_metadata'] | |
else: | |
manual_joins, columns_to_join_new = ( | |
_manual_join_columns(columns_to_join)) | |
query_prefix = context.session.query(models.Instance) | |
for column in columns_to_join_new: | |
if 'extra.' in column: | |
query_prefix = query_prefix.options(undefer(column)) | |
else: | |
query_prefix = query_prefix.options(joinedload(column)) | |
# Note: order_by is done in the sqlalchemy.utils.py paginate_query(), | |
# no need to do it here as well | |
# Make a copy of the filters dictionary to use going forward, as we'll | |
# be modifying it and we shouldn't affect the caller's use of it. | |
filters = copy.deepcopy(filters) | |
if 'changes-since' in filters: | |
changes_since = timeutils.normalize_time(filters['changes-since']) | |
query_prefix = query_prefix.\ | |
filter(models.Instance.updated_at >= changes_since) | |
if 'deleted' in filters: | |
# Instances can be soft or hard deleted and the query needs to | |
# include or exclude both | |
deleted = filters.pop('deleted') | |
if deleted: | |
if filters.pop('soft_deleted', True): | |
delete = or_( | |
models.Instance.deleted == models.Instance.id, | |
models.Instance.vm_state == vm_states.SOFT_DELETED | |
) | |
query_prefix = query_prefix.\ | |
filter(delete) | |
else: | |
query_prefix = query_prefix.\ | |
filter(models.Instance.deleted == models.Instance.id) | |
else: | |
query_prefix = query_prefix.\ | |
filter_by(deleted=0) | |
if not filters.pop('soft_deleted', False): | |
# It would be better to have vm_state not be nullable | |
# but until then we test it explicitly as a workaround. | |
not_soft_deleted = or_( | |
models.Instance.vm_state != vm_states.SOFT_DELETED, | |
models.Instance.vm_state == null() | |
) | |
query_prefix = query_prefix.filter(not_soft_deleted) | |
if 'cleaned' in filters: | |
cleaned = 1 if filters.pop('cleaned') else 0 | |
query_prefix = query_prefix.filter(models.Instance.cleaned == cleaned) | |
if 'tags' in filters: | |
tags = filters.pop('tags') | |
# We build a JOIN ladder expression for each tag, JOIN'ing | |
# the first tag to the instances table, and each subsequent | |
# tag to the last JOIN'd tags table | |
first_tag = tags.pop(0) | |
query_prefix = query_prefix.join(models.Instance.tags) | |
query_prefix = query_prefix.filter(models.Tag.tag == first_tag) | |
for tag in tags: | |
tag_alias = aliased(models.Tag) | |
query_prefix = query_prefix.join(tag_alias, | |
models.Instance.tags) | |
query_prefix = query_prefix.filter(tag_alias.tag == tag) | |
if 'tags-any' in filters: | |
tags = filters.pop('tags-any') | |
tag_alias = aliased(models.Tag) | |
query_prefix = query_prefix.join(tag_alias, models.Instance.tags) | |
query_prefix = query_prefix.filter(tag_alias.tag.in_(tags)) | |
if 'not-tags' in filters: | |
tags = filters.pop('not-tags') | |
first_tag = tags.pop(0) | |
subq = query_prefix.session.query(models.Tag.resource_id) | |
subq = subq.join(models.Instance.tags) | |
subq = subq.filter(models.Tag.tag == first_tag) | |
for tag in tags: | |
tag_alias = aliased(models.Tag) | |
subq = subq.join(tag_alias, models.Instance.tags) | |
subq = subq.filter(tag_alias.tag == tag) | |
query_prefix = query_prefix.filter(~models.Instance.uuid.in_(subq)) | |
if 'not-tags-any' in filters: | |
tags = filters.pop('not-tags-any') | |
query_prefix = query_prefix.filter(~models.Instance.tags.any( | |
models.Tag.tag.in_(tags))) | |
if not context.is_admin: | |
# If we're not admin context, add appropriate filter.. | |
if context.project_id: | |
filters['project_id'] = context.project_id | |
else: | |
filters['user_id'] = context.user_id | |
# Filters for exact matches that we can do along with the SQL query... | |
# For other filters that don't match this, we will do regexp matching | |
exact_match_filter_names = ['project_id', 'user_id', 'image_ref', | |
'vm_state', 'instance_type_id', 'uuid', | |
'metadata', 'host', 'task_state', | |
'system_metadata'] | |
# Filter the query | |
query_prefix = _exact_instance_filter(query_prefix, | |
filters, exact_match_filter_names) | |
if query_prefix is None: | |
return [] | |
query_prefix = _regex_instance_filter(query_prefix, filters) | |
# paginate query | |
if marker is not None: | |
try: | |
marker = _instance_get_by_uuid( | |
context.elevated(read_deleted='yes'), marker) | |
except exception.InstanceNotFound: | |
raise exception.MarkerNotFound(marker=marker) | |
try: | |
query_prefix = sqlalchemyutils.paginate_query(query_prefix, | |
models.Instance, limit, | |
sort_keys, | |
marker=marker, | |
sort_dirs=sort_dirs) | |
except db_exc.InvalidSortKey: | |
raise exception.InvalidSortKey() | |
return _instances_fill_metadata(context, query_prefix.all(), manual_joins) | |
def _db_connection_type(db_connection): | |
"""Returns a lowercase symbol for the db type. | |
This is useful when we need to change what we are doing per DB | |
(like handling regexes). In a CellsV2 world it probably needs to | |
do something better than use the database configuration string. | |
""" | |
db_string = db_connection.split(':')[0].split('+')[0] | |
return db_string.lower() | |
def _safe_regex_mysql(raw_string): | |
"""Make regex safe to mysql. | |
Certain items like '|' are interpreted raw by mysql REGEX. If you | |
search for a single | then you trigger an error because it's | |
expecting content on either side. | |
For consistency sake we escape all '|'. This does mean we wouldn't | |
support something like foo|bar to match completely different | |
things, however, one can argue putting such complicated regex into | |
name search probably means you are doing this wrong. | |
""" | |
return raw_string.replace('|', '\\|') | |
def _get_regexp_ops(connection): | |
"""Return safety filter and db opts for regex.""" | |
regexp_op_map = { | |
'postgresql': '~', | |
'mysql': 'REGEXP', | |
'sqlite': 'REGEXP' | |
} | |
regex_safe_filters = { | |
'mysql': _safe_regex_mysql | |
} | |
db_type = _db_connection_type(connection) | |
return (regex_safe_filters.get(db_type, lambda x: x), | |
regexp_op_map.get(db_type, 'LIKE')) | |
def _regex_instance_filter(query, filters): | |
"""Applies regular expression filtering to an Instance query. | |
Returns the updated query. | |
:param query: query to apply filters to | |
:param filters: dictionary of filters with regex values | |
""" | |
model = models.Instance | |
safe_regex_filter, db_regexp_op = _get_regexp_ops(CONF.database.connection) | |
for filter_name in filters: | |
try: | |
column_attr = getattr(model, filter_name) | |
except AttributeError: | |
continue | |
if 'property' == type(column_attr).__name__: | |
continue | |
filter_val = filters[filter_name] | |
# Sometimes the REGEX filter value is not a string | |
if not isinstance(filter_val, six.string_types): | |
filter_val = str(filter_val) | |
if db_regexp_op == 'LIKE': | |
query = query.filter(column_attr.op(db_regexp_op)( | |
u'%' + filter_val + u'%')) | |
else: | |
filter_val = safe_regex_filter(filter_val) | |
query = query.filter(column_attr.op(db_regexp_op)( | |
filter_val)) | |
return query | |
def _exact_instance_filter(query, filters, legal_keys): | |
"""Applies exact match filtering to an Instance query. | |
Returns the updated query. Modifies filters argument to remove | |
filters consumed. | |
:param query: query to apply filters to | |
:param filters: dictionary of filters; values that are lists, | |
tuples, sets, or frozensets cause an 'IN' test to | |
be performed, while exact matching ('==' operator) | |
is used for other values | |
:param legal_keys: list of keys to apply exact filtering to | |
""" | |
filter_dict = {} | |
model = models.Instance | |
# Walk through all the keys | |
for key in legal_keys: | |
# Skip ones we're not filtering on | |
if key not in filters: | |
continue | |
# OK, filtering on this key; what value do we search for? | |
value = filters.pop(key) | |
if key in ('metadata', 'system_metadata'): | |
column_attr = getattr(model, key) | |
if isinstance(value, list): | |
for item in value: | |
for k, v in item.items(): | |
query = query.filter(column_attr.any(key=k)) | |
query = query.filter(column_attr.any(value=v)) | |
else: | |
for k, v in value.items(): | |
query = query.filter(column_attr.any(key=k)) | |
query = query.filter(column_attr.any(value=v)) | |
elif isinstance(value, (list, tuple, set, frozenset)): | |
if not value: | |
return None # empty IN-predicate; short circuit | |
# Looking for values in a list; apply to query directly | |
column_attr = getattr(model, key) | |
query = query.filter(column_attr.in_(value)) | |
else: | |
# OK, simple exact match; save for later | |
filter_dict[key] = value | |
# Apply simple exact matches | |
if filter_dict: | |
query = query.filter(*[getattr(models.Instance, k) == v | |
for k, v in filter_dict.items()]) | |
return query | |
def process_sort_params(sort_keys, sort_dirs, | |
default_keys=['created_at', 'id'], | |
default_dir='asc'): | |
"""Process the sort parameters to include default keys. | |
Creates a list of sort keys and a list of sort directions. Adds the default | |
keys to the end of the list if they are not already included. | |
When adding the default keys to the sort keys list, the associated | |
direction is: | |
1) The first element in the 'sort_dirs' list (if specified), else | |
2) 'default_dir' value (Note that 'asc' is the default value since this is | |
the default in sqlalchemy.utils.paginate_query) | |
:param sort_keys: List of sort keys to include in the processed list | |
:param sort_dirs: List of sort directions to include in the processed list | |
:param default_keys: List of sort keys that need to be included in the | |
processed list, they are added at the end of the list | |
if not already specified. | |
:param default_dir: Sort direction associated with each of the default | |
keys that are not supplied, used when they are added | |
to the processed list | |
:returns: list of sort keys, list of sort directions | |
:raise exception.InvalidInput: If more sort directions than sort keys | |
are specified or if an invalid sort | |
direction is specified | |
""" | |
# Determine direction to use for when adding default keys | |
if sort_dirs and len(sort_dirs) != 0: | |
default_dir_value = sort_dirs[0] | |
else: | |
default_dir_value = default_dir | |
# Create list of keys (do not modify the input list) | |
if sort_keys: | |
result_keys = list(sort_keys) | |
else: | |
result_keys = [] | |
# If a list of directions is not provided, use the default sort direction | |
# for all provided keys | |
if sort_dirs: | |
result_dirs = [] | |
# Verify sort direction | |
for sort_dir in sort_dirs: | |
if sort_dir not in ('asc', 'desc'): | |
msg = _("Unknown sort direction, must be 'desc' or 'asc'") | |
raise exception.InvalidInput(reason=msg) | |
result_dirs.append(sort_dir) | |
else: | |
result_dirs = [default_dir_value for _sort_key in result_keys] | |
# Ensure that the key and direction length match | |
while len(result_dirs) < len(result_keys): | |
result_dirs.append(default_dir_value) | |
# Unless more direction are specified, which is an error | |
if len(result_dirs) > len(result_keys): | |
msg = _("Sort direction size exceeds sort key size") | |
raise exception.InvalidInput(reason=msg) | |
# Ensure defaults are included | |
for key in default_keys: | |
if key not in result_keys: | |
result_keys.append(key) | |
result_dirs.append(default_dir_value) | |
return result_keys, result_dirs | |
@require_context | |
@pick_context_manager_reader_allow_async | |
def instance_get_active_by_window_joined(context, begin, end=None, | |
project_id=None, host=None, | |
columns_to_join=None, limit=None, | |
marker=None): | |
"""Return instances and joins that were active during window.""" | |
query = context.session.query(models.Instance) | |
if columns_to_join is None: | |
columns_to_join_new = ['info_cache', 'security_groups'] | |
manual_joins = ['metadata', 'system_metadata'] | |
else: | |
manual_joins, columns_to_join_new = ( | |
_manual_join_columns(columns_to_join)) | |
for column in columns_to_join_new: | |
if 'extra.' in column: | |
query = query.options(undefer(column)) | |
else: | |
query = query.options(joinedload(column)) | |
query = query.filter(or_(models.Instance.terminated_at == null(), | |
models.Instance.terminated_at > begin)) | |
if end: | |
query = query.filter(models.Instance.launched_at < end) | |
if project_id: | |
query = query.filter_by(project_id=project_id) | |
if host: | |
query = query.filter_by(host=host) | |
if marker is not None: | |
try: | |
marker = _instance_get_by_uuid( | |
context.elevated(read_deleted='yes'), marker) | |
except exception.InstanceNotFound: | |
raise exception.MarkerNotFound(marker=marker) | |
query = sqlalchemyutils.paginate_query( | |
query, models.Instance, limit, ['project_id', 'uuid'], marker=marker) | |
return _instances_fill_metadata(context, query.all(), manual_joins) | |
def _instance_get_all_query(context, project_only=False, joins=None): | |
if joins is None: | |
joins = ['info_cache', 'security_groups'] | |
query = model_query(context, | |
models.Instance, | |
project_only=project_only) | |
for column in joins: | |
if 'extra.' in column: | |
query = query.options(undefer(column)) | |
else: | |
query = query.options(joinedload(column)) | |
return query | |
@pick_context_manager_reader_allow_async | |
def instance_get_all_by_host(context, host, columns_to_join=None): | |
query = _instance_get_all_query(context, joins=columns_to_join) | |
return _instances_fill_metadata(context, | |
query.filter_by(host=host).all(), | |
manual_joins=columns_to_join) | |
def _instance_get_all_uuids_by_host(context, host): | |
"""Return a list of the instance uuids on a given host. | |
Returns a list of UUIDs, not Instance model objects. | |
""" | |
uuids = [] | |
for tuple in model_query(context, models.Instance, (models.Instance.uuid,), | |
read_deleted="no").\ | |
filter_by(host=host).\ | |
all(): | |
uuids.append(tuple[0]) | |
return uuids | |
@pick_context_manager_reader | |
def instance_get_all_by_host_and_node(context, host, node, | |
columns_to_join=None): | |
if columns_to_join is None: | |
manual_joins = [] | |
else: | |
candidates = ['system_metadata', 'metadata'] | |
manual_joins = [x for x in columns_to_join if x in candidates] | |
columns_to_join = list(set(columns_to_join) - set(candidates)) | |
return _instances_fill_metadata(context, | |
_instance_get_all_query( | |
context, | |
joins=columns_to_join).filter_by(host=host). | |
filter_by(node=node).all(), manual_joins=manual_joins) | |
@pick_context_manager_reader | |
def instance_get_all_by_host_and_not_type(context, host, type_id=None): | |
return _instances_fill_metadata(context, | |
_instance_get_all_query(context).filter_by(host=host). | |
filter(models.Instance.instance_type_id != type_id).all()) | |
@pick_context_manager_reader | |
def instance_get_all_by_grantee_security_groups(context, group_ids): | |
if not group_ids: | |
return [] | |
return _instances_fill_metadata(context, | |
_instance_get_all_query(context). | |
join(models.Instance.security_groups). | |
filter(models.SecurityGroup.rules.any( | |
models.SecurityGroupIngressRule.group_id.in_(group_ids))). | |
all()) | |
@require_context | |
@pick_context_manager_reader | |
def instance_floating_address_get_all(context, instance_uuid): | |
if not uuidutils.is_uuid_like(instance_uuid): | |
raise exception.InvalidUUID(uuid=instance_uuid) | |
floating_ips = model_query(context, | |
models.FloatingIp, | |
(models.FloatingIp.address,)).\ | |
join(models.FloatingIp.fixed_ip).\ | |
filter_by(instance_uuid=instance_uuid) | |
return [floating_ip.address for floating_ip in floating_ips] | |
# NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0. | |
@pick_context_manager_reader | |
def instance_get_all_hung_in_rebooting(context, reboot_window): | |
reboot_window = (timeutils.utcnow() - | |
datetime.timedelta(seconds=reboot_window)) | |
# NOTE(danms): this is only used in the _poll_rebooting_instances() | |
# call in compute/manager, so we can avoid the metadata lookups | |
# explicitly | |
return _instances_fill_metadata(context, | |
model_query(context, models.Instance). | |
filter(models.Instance.updated_at <= reboot_window). | |
filter_by(task_state=task_states.REBOOTING).all(), | |
manual_joins=[]) | |
def _retry_instance_update(): | |
"""Wrap with oslo_db_api.wrap_db_retry, and also retry on | |
UnknownInstanceUpdateConflict. | |
""" | |
exception_checker = \ | |
lambda exc: isinstance(exc, (exception.UnknownInstanceUpdateConflict,)) | |
return oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True, | |
exception_checker=exception_checker) | |
@require_context | |
@_retry_instance_update() | |
@pick_context_manager_writer | |
def instance_update(context, instance_uuid, values, expected=None): | |
return _instance_update(context, instance_uuid, values, expected) | |
@require_context | |
@_retry_instance_update() | |
@pick_context_manager_writer | |
def instance_update_and_get_original(context, instance_uuid, values, | |
columns_to_join=None, expected=None): | |
"""Set the given properties on an instance and update it. Return | |
a shallow copy of the original instance reference, as well as the | |
updated one. | |
:param context: = request context object | |
:param instance_uuid: = instance uuid | |
:param values: = dict containing column values | |
If "expected_task_state" exists in values, the update can only happen | |
when the task state before update matches expected_task_state. Otherwise | |
a UnexpectedTaskStateError is thrown. | |
:returns: a tuple of the form (old_instance_ref, new_instance_ref) | |
Raises NotFound if instance does not exist. | |
""" | |
instance_ref = _instance_get_by_uuid(context, instance_uuid, | |
columns_to_join=columns_to_join) | |
return (copy.copy(instance_ref), _instance_update( | |
context, instance_uuid, values, expected, original=instance_ref)) | |
# NOTE(danms): This updates the instance's metadata list in-place and in | |
# the database to avoid stale data and refresh issues. It assumes the | |
# delete=True behavior of instance_metadata_update(...) | |
def _instance_metadata_update_in_place(context, instance, metadata_type, model, | |
metadata): | |
metadata = dict(metadata) | |
to_delete = [] | |
for keyvalue in instance[metadata_type]: | |
key = keyvalue['key'] | |
if key in metadata: | |
keyvalue['value'] = metadata.pop(key) | |
elif key not in metadata: | |
to_delete.append(keyvalue) | |
# NOTE: we have to hard_delete here otherwise we will get more than one | |
# system_metadata record when we read deleted for an instance; | |
# regular metadata doesn't have the same problem because we don't | |
# allow reading deleted regular metadata anywhere. | |
if metadata_type == 'system_metadata': | |
for condemned in to_delete: | |
context.session.delete(condemned) | |
instance[metadata_type].remove(condemned) | |
else: | |
for condemned in to_delete: | |
condemned.soft_delete(context.session) | |
for key, value in metadata.items(): | |
newitem = model() | |
newitem.update({'key': key, 'value': value, | |
'instance_uuid': instance['uuid']}) | |
context.session.add(newitem) | |
instance[metadata_type].append(newitem) | |
def _instance_update(context, instance_uuid, values, expected, original=None): | |
if not uuidutils.is_uuid_like(instance_uuid): | |
raise exception.InvalidUUID(instance_uuid) | |
if expected is None: | |
expected = {} | |
else: | |
# Coerce all single values to singleton lists | |
expected = {k: [None] if v is None else sqlalchemyutils.to_list(v) | |
for (k, v) in expected.items()} | |
# Extract 'expected_' values from values dict, as these aren't actually | |
# updates | |
for field in ('task_state', 'vm_state'): | |
expected_field = 'expected_%s' % field | |
if expected_field in values: | |
value = values.pop(expected_field, None) | |
# Coerce all single values to singleton lists | |
if value is None: | |
expected[field] = [None] | |
else: | |
expected[field] = sqlalchemyutils.to_list(value) | |
# Values which need to be updated separately | |
metadata = values.pop('metadata', None) | |
system_metadata = values.pop('system_metadata', None) | |
_handle_objects_related_type_conversions(values) | |
# Hostname is potentially unique, but this is enforced in code rather | |
# than the DB. The query below races, but the number of users of | |
# osapi_compute_unique_server_name_scope is small, and a robust fix | |
# will be complex. This is intentionally left as is for the moment. | |
if 'hostname' in values: | |
_validate_unique_server_name(context, values['hostname']) | |
compare = models.Instance(uuid=instance_uuid, **expected) | |
try: | |
instance_ref = model_query(context, models.Instance, | |
project_only=True).\ | |
update_on_match(compare, 'uuid', values) | |
except update_match.NoRowsMatched: | |
# Update failed. Try to find why and raise a specific error. | |
# We should get here only because our expected values were not current | |
# when update_on_match executed. Having failed, we now have a hint that | |
# the values are out of date and should check them. | |
# This code is made more complex because we are using repeatable reads. | |
# If we have previously read the original instance in the current | |
# transaction, reading it again will return the same data, even though | |
# the above update failed because it has changed: it is not possible to | |
# determine what has changed in this transaction. In this case we raise | |
# UnknownInstanceUpdateConflict, which will cause the operation to be | |
# retried in a new transaction. | |
# Because of the above, if we have previously read the instance in the | |
# current transaction it will have been passed as 'original', and there | |
# is no point refreshing it. If we have not previously read the | |
# instance, we can fetch it here and we will get fresh data. | |
if original is None: | |
original = _instance_get_by_uuid(context, instance_uuid) | |
conflicts_expected = {} | |
conflicts_actual = {} | |
for (field, expected_values) in expected.items(): | |
actual = original[field] | |
if actual not in expected_values: | |
conflicts_expected[field] = expected_values | |
conflicts_actual[field] = actual | |
# Exception properties | |
exc_props = { | |
'instance_uuid': instance_uuid, | |
'expected': conflicts_expected, | |
'actual': conflicts_actual | |
} | |
# There was a conflict, but something (probably the MySQL read view, | |
# but possibly an exceptionally unlikely second race) is preventing us | |
# from seeing what it is. When we go round again we'll get a fresh | |
# transaction and a fresh read view. | |
if len(conflicts_actual) == 0: | |
raise exception.UnknownInstanceUpdateConflict(**exc_props) | |
# Task state gets special handling for convenience. We raise the | |
# specific error UnexpectedDeletingTaskStateError or | |
# UnexpectedTaskStateError as appropriate | |
if 'task_state' in conflicts_actual: | |
conflict_task_state = conflicts_actual['task_state'] | |
if conflict_task_state == task_states.DELETING: | |
exc = exception.UnexpectedDeletingTaskStateError | |
else: | |
exc = exception.UnexpectedTaskStateError | |
# Everything else is an InstanceUpdateConflict | |
else: | |
exc = exception.InstanceUpdateConflict | |
raise exc(**exc_props) | |
if metadata is not None: | |
_instance_metadata_update_in_place(context, instance_ref, | |
'metadata', | |
models.InstanceMetadata, | |
metadata) | |
if system_metadata is not None: | |
_instance_metadata_update_in_place(context, instance_ref, | |
'system_metadata', | |
models.InstanceSystemMetadata, | |
system_metadata) | |
return instance_ref | |
@pick_context_manager_writer | |
def instance_add_security_group(context, instance_uuid, security_group_id): | |
"""Associate the given security group with the given instance.""" | |
sec_group_ref = models.SecurityGroupInstanceAssociation() | |
sec_group_ref.update({'instance_uuid': instance_uuid, | |
'security_group_id': security_group_id}) | |
sec_group_ref.save(context.session) | |
@require_context | |
@pick_context_manager_writer | |
def instance_remove_security_group(context, instance_uuid, security_group_id): | |
"""Disassociate the given security group from the given instance.""" | |
model_query(context, models.SecurityGroupInstanceAssociation).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
filter_by(security_group_id=security_group_id).\ | |
soft_delete() | |
################### | |
@require_context | |
@pick_context_manager_reader | |
def instance_info_cache_get(context, instance_uuid): | |
"""Gets an instance info cache from the table. | |
:param instance_uuid: = uuid of the info cache's instance | |
""" | |
return model_query(context, models.InstanceInfoCache).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
first() | |
@require_context | |
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) | |
@pick_context_manager_writer | |
def instance_info_cache_update(context, instance_uuid, values): | |
"""Update an instance info cache record in the table. | |
:param instance_uuid: = uuid of info cache's instance | |
:param values: = dict containing column values to update | |
""" | |
convert_objects_related_datetimes(values) | |
info_cache = model_query(context, models.InstanceInfoCache).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
first() | |
needs_create = False | |
if info_cache and info_cache['deleted']: | |
raise exception.InstanceInfoCacheNotFound( | |
instance_uuid=instance_uuid) | |
elif not info_cache: | |
# NOTE(tr3buchet): just in case someone blows away an instance's | |
# cache entry, re-create it. | |
values['instance_uuid'] = instance_uuid | |
info_cache = models.InstanceInfoCache(**values) | |
needs_create = True | |
try: | |
with get_context_manager(context).writer.savepoint.using(context): | |
if needs_create: | |
info_cache.save(context.session) | |
else: | |
info_cache.update(values) | |
except db_exc.DBDuplicateEntry: | |
# NOTE(sirp): Possible race if two greenthreads attempt to | |
# recreate the instance cache entry at the same time. First one | |
# wins. | |
pass | |
return info_cache | |
@require_context | |
@pick_context_manager_writer | |
def instance_info_cache_delete(context, instance_uuid): | |
"""Deletes an existing instance_info_cache record | |
:param instance_uuid: = uuid of the instance tied to the cache record | |
""" | |
model_query(context, models.InstanceInfoCache).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
soft_delete() | |
################### | |
def _instance_extra_create(context, values): | |
inst_extra_ref = models.InstanceExtra() | |
inst_extra_ref.update(values) | |
inst_extra_ref.save(context.session) | |
return inst_extra_ref | |
@pick_context_manager_writer | |
def instance_extra_update_by_uuid(context, instance_uuid, values): | |
rows_updated = model_query(context, models.InstanceExtra).\ | |
filter_by(instance_uuid=instance_uuid).\ | |
update(values) | |
if not rows_updated: | |
LOG.debug("Created instance_extra for %s", instance_uuid) | |
create_values = copy.copy(values) | |
create_values["instance_uuid"] = instance_uuid | |
_instance_extra_create(context, create_values) | |
rows_updated = 1 | |
return rows_updated | |
@pick_context_manager_reader | |
def instance_extra_get_by_instance_uuid(context, instance_uuid, | |
columns=None): | |
query = model_query(context, models.InstanceExtra).\ | |
filter_by(instance_uuid=instance_uuid) | |
if columns is None: | |
columns = ['numa_topology', 'pci_requests', 'flavor', 'vcpu_model', | |
'migration_context'] | |
for column in columns: | |
query = query.options(undefer(column)) | |
instance_extra = query.first() | |
return instance_extra | |
################### | |
@require_context | |
@pick_context_manager_writer | |
def key_pair_create(context, values): | |
try: | |
key_pair_ref = models.KeyPair() | |
key_pair_ref.update(values) | |
key_pair_ref.save(context.session) | |
return key_pair_ref | |
except db_exc.DBDuplicateEntry: | |
raise exception.KeyPairExists(key_name=values['name']) | |
@require_context | |
@pick_context_manager_writer | |
def key_pair_destroy(context, user_id, name): | |
result = model_query(context, models.KeyPair).\ | |
filter_by(user_id=user_id).\ | |
filter_by(name=name).\ | |
soft_delete() | |
if not result: | |
raise exception.KeypairNotFound(user_id=user_id, name=name) | |
@require_context | |
@pick_context_manager_reader | |
def key_pair_get(context, user_id, name): | |
result = model_query(context, models.KeyPair).\ | |
filter_by(user_id=user_id).\ | |
filter_by(name=name).\ | |
first() | |
if not result: | |
raise exception.KeypairNotFound(user_id=user_id, name=name) | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def key_pair_get_all_by_user(context, user_id, limit=None, marker=None): | |
marker_row = None | |
if marker is not None: | |
marker_row = model_query(context, models.KeyPair, read_deleted="no").\ | |
filter_by(name=marker).filter_by(user_id=user_id).first() | |
if not marker_row: | |
raise exception.MarkerNotFound(marker=marker) | |
query = model_query(context, models.KeyPair, read_deleted="no").\ | |
filter_by(user_id=user_id) | |
query = sqlalchemyutils.paginate_query( | |
query, models.KeyPair, limit, ['name'], marker=marker_row) | |
return query.all() | |
@require_context | |
@pick_context_manager_reader | |
def key_pair_count_by_user(context, user_id): | |
return model_query(context, models.KeyPair, read_deleted="no").\ | |
filter_by(user_id=user_id).\ | |
count() | |
################### | |
@pick_context_manager_writer | |
def network_associate(context, project_id, network_id=None, force=False): | |
"""Associate a project with a network. | |
called by project_get_networks under certain conditions | |
and network manager add_network_to_project() | |
only associate if the project doesn't already have a network | |
or if force is True | |
force solves race condition where a fresh project has multiple instance | |
builds simultaneously picked up by multiple network hosts which attempt | |
to associate the project with multiple networks | |
force should only be used as a direct consequence of user request | |
all automated requests should not use force | |
""" | |
def network_query(project_filter, id=None): | |
filter_kwargs = {'project_id': project_filter} | |
if id is not None: | |
filter_kwargs['id'] = id | |
return model_query(context, models.Network, read_deleted="no").\ | |
filter_by(**filter_kwargs).\ | |
with_lockmode('update').\ | |
first() | |
if not force: | |
# find out if project has a network | |
network_ref = network_query(project_id) | |
if force or not network_ref: | |
# in force mode or project doesn't have a network so associate | |
# with a new network | |
# get new network | |
network_ref = network_query(None, network_id) | |
if not network_ref: | |
raise exception.NoMoreNetworks() | |
# associate with network | |
# NOTE(vish): if with_lockmode isn't supported, as in sqlite, | |
# then this has concurrency issues | |
network_ref['project_id'] = project_id | |
context.session.add(network_ref) | |
return network_ref | |
def _network_ips_query(context, network_id): | |
return model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(network_id=network_id) | |
@pick_context_manager_reader | |
def network_count_reserved_ips(context, network_id): | |
return _network_ips_query(context, network_id).\ | |
filter_by(reserved=True).\ | |
count() | |
@pick_context_manager_writer | |
def network_create_safe(context, values): | |
network_ref = models.Network() | |
network_ref['uuid'] = uuidutils.generate_uuid() | |
network_ref.update(values) | |
try: | |
network_ref.save(context.session) | |
return network_ref | |
except db_exc.DBDuplicateEntry: | |
raise exception.DuplicateVlan(vlan=values['vlan']) | |
@pick_context_manager_writer | |
def network_delete_safe(context, network_id): | |
result = model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(network_id=network_id).\ | |
filter_by(allocated=True).\ | |
count() | |
if result != 0: | |
raise exception.NetworkInUse(network_id=network_id) | |
network_ref = _network_get(context, network_id=network_id) | |
model_query(context, models.FixedIp, read_deleted="no").\ | |
filter_by(network_id=network_id).\ | |
soft_delete() | |
context.session.delete(network_ref) | |
@pick_context_manager_writer | |
def network_disassociate(context, network_id, disassociate_host, | |
disassociate_project): | |
net_update = {} | |
if disassociate_project: | |
net_update['project_id'] = None | |
if disassociate_host: | |
net_update['host'] = None | |
network_update(context, network_id, net_update) | |
def _network_get(context, network_id, project_only='allow_none'): | |
result = model_query(context, models.Network, project_only=project_only).\ | |
filter_by(id=network_id).\ | |
first() | |
if not result: | |
raise exception.NetworkNotFound(network_id=network_id) | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def network_get(context, network_id, project_only='allow_none'): | |
return _network_get(context, network_id, project_only=project_only) | |
@require_context | |
@pick_context_manager_reader | |
def network_get_all(context, project_only): | |
result = model_query(context, models.Network, read_deleted="no", | |
project_only=project_only).all() | |
if not result: | |
raise exception.NoNetworksFound() | |
return result | |
@require_context | |
@pick_context_manager_reader | |
def network_get_all_by_uuids(context, network_uuids, project_only): | |
result = model_query(context, models.Network, read_deleted="no", | |
project_only=project_only).\ | |
filter(models.Network.uuid.in_(network_uuids)).\ | |
all() | |
if not result: | |
raise exception.NoNetworksFound() | |
# check if the result contains all the networks | |
# we are looking for | |
for network_uuid in network_uuids: | |
for network in result: | |
if network['uuid'] == network_uuid: | |
break | |
else: | |
if project_only: | |
raise exception.NetworkNotFoundForProject( | |
network_uuid=network_uuid, project_id=context.project_id) | |
raise exception.NetworkNotFound(network_id=network_uuid) | |
return result | |
def _get_associated_fixed_ips_query(context, network_id, host=None): | |
# NOTE(vish): The ugly joins here are to solve a performance issue and | |
# should be removed once we can add and remove leases | |
# without regenerating the whole list | |
vif_and = and_(models.VirtualInterface.id == | |
models.FixedIp.virtual_interface_id, | |
models.VirtualInterface.deleted == 0) | |
inst_and = and_(models.Instance.uuid == models.FixedIp.instance_uuid, | |
models.Instance.deleted == 0) | |
# NOTE(vish): This subquery left joins the minimum interface id for each | |
# instance. If the join succeeds (i.e. the 11th column is not | |
# null), then the fixed ip is on the first interface. | |
subq = context.session.query( | |
func.min(models.VirtualInterface.id).label("id"), | |
models.VirtualInterface.instance_uuid).\ | |
group_by(models.VirtualInterface.instance_uuid).subquery() | |
subq_and = and_(subq.c.id == models.FixedIp.virtual_interface_id, | |
subq.c.instance_uuid == models.VirtualInterface.instance_uuid) | |
query = context.session.query( | |
models.FixedIp.address, | |
models.FixedIp.instance_uuid, | |
models.FixedIp.network_id, | |
models.FixedIp.virtual_interface_id, | |
models.VirtualInterface.address, | |
models.Instance.hostname, | |
models.Instance.updated_at, | |
models.Instance.created_at, | |
models.FixedIp.allocated, | |
models.FixedIp.leased, | |
subq.c.id).\ | |
filter(models.FixedIp.deleted == 0).\ | |
filter(models.FixedIp.network_id == network_id).\ | |
join((models.VirtualInterface, vif_and)).\ | |
join((models.Instance, inst_and)).\ | |
outerjoin((subq, subq_and)).\ | |
filter(models.FixedIp.instance_uuid != null()).\ | |
filter(models.FixedIp.virtual_interface_id != null()) | |
if host: | |
query = query.filter(models.Instance.host == host) | |
return query |