Created
July 8, 2015 19:57
-
-
Save saamich/03ddd11361992691512f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com> | |
# | |
# Author: Sylvain Afchain <sylvain.afchain@enovance.com> | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you may | |
# not use this file except in compliance with the License. You may obtain | |
# a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
# License for the specific language governing permissions and limitations | |
# under the License. | |
import datetime | |
import eventlet | |
import socket | |
import sqlalchemy as sa | |
from oslo.config import cfg | |
from neutron.agent.common import config | |
from neutron.agent import l3_agent | |
from neutron.agent.linux import external_process | |
from neutron.agent.linux import interface | |
from neutron.agent.linux import ip_lib | |
from neutron.agent.linux import utils | |
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api as l3_rpc | |
from neutron.common import constants | |
from neutron.common import topics | |
from neutron import context as n_context | |
from neutron.db import agents_db | |
from neutron.db import api as db_api | |
from neutron.db import db_base_plugin_v2 | |
from neutron.db import l3_agentschedulers_db | |
from neutron.db import l3_db | |
from neutron.db import model_base | |
from neutron.openstack.common import importutils | |
from neutron.openstack.common import log as logging | |
from neutron.openstack.common.notifier import api as notifier_api | |
from neutron.openstack.common import periodic_task | |
from neutron.openstack.common.rpc import proxy | |
from neutron.openstack.common import service | |
from neutron.openstack.common import timeutils | |
from neutron.scheduler import l3_agent_scheduler | |
LOG = logging.getLogger(__name__) | |
HEALTHCHECK_LOCK = "l3-healthcheck" | |
NS_PREFIX = 'qrouter-' | |
INTERNAL_DEV_PREFIX = 'qr-' | |
EXTERNAL_DEV_PREFIX = 'qg-' | |
class L3HealthcheckLock(model_base.BASEV2): | |
"""Represents a lock.""" | |
lock_id = sa.Column(sa.String(16), primary_key=True) | |
valid_timestamp = sa.Column(sa.DateTime, nullable=False) | |
host = sa.Column(sa.String(48), nullable=False) | |
class L3AgentNotify(proxy.RpcProxy): | |
"""API for plugin to notify L3 agent.""" | |
BASE_RPC_API_VERSION = '1.0' | |
def __init__(self, topic=topics.L3_AGENT): | |
super(L3AgentNotify, self).__init__( | |
topic=topic, default_version=self.BASE_RPC_API_VERSION) | |
def _notification(self, context, method, router_ids): | |
"""Notify all the agents that are hosting the routers.""" | |
self.fanout_cast( | |
context, self.make_msg(method, | |
routers=router_ids), | |
topic=topics.L3_AGENT) | |
def routers_updated(self, context, router_ids): | |
if router_ids: | |
self._notification(context, 'routers_updated', router_ids) | |
class L3HealthcheckAgent(db_base_plugin_v2.NeutronDbPluginV2, | |
l3_agent_scheduler.ChanceScheduler, | |
l3_db.L3_NAT_db_mixin, | |
l3_agentschedulers_db.L3AgentSchedulerDbMixin): | |
def __init__(self, host): | |
db_api.configure_db() | |
self.l3_notifier = L3AgentNotify() | |
self.session = db_api.get_session() | |
self.host = host | |
self.conf = cfg.CONF | |
self.root_helper = config.get_root_helper(self.conf) | |
self.agents = {} | |
self.context = n_context.get_admin_context() | |
self.last_plugin_ping = timeutils.utcnow() | |
if not self.conf.interface_driver: | |
raise SystemExit(_('An interface driver must be specified')) | |
try: | |
self.driver = importutils.import_object( | |
self.conf.interface_driver, | |
self.conf | |
) | |
except Exception: | |
msg = _("Error importing interface driver " | |
"'%s'") % self.conf.interface_driver | |
raise SystemExit(msg) | |
def lock(self, context): | |
try: | |
with self.session.begin(subtransactions=True): | |
hc_lock = L3HealthcheckLock(lock_id=HEALTHCHECK_LOCK, | |
valid_timestamp=timeutils.utcnow(), | |
host=self.host) | |
self.session.add(hc_lock) | |
except: | |
LOG.warning(_("Unable to get a lock, migration in progress")) | |
return False | |
return True | |
def unlock(self, context): | |
with self.session.begin(subtransactions=True): | |
query = self.session.query(L3HealthcheckLock) | |
hc_lock = query.first() | |
if hc_lock: | |
self.session.delete(hc_lock) | |
def purge_locks(self, context): | |
delta = datetime.timedelta( | |
seconds=cfg.CONF.L3HEALTHCHECK.lock_validity) | |
tm = timeutils.utcnow() - delta | |
with self.session.begin(subtransactions=True): | |
query = self.session.query(L3HealthcheckLock) | |
query = query.filter(L3HealthcheckLock.valid_timestamp < tm) | |
hc_lock = query.first() | |
if hc_lock: | |
self.session.delete(hc_lock) | |
def check_isolated(self, context): | |
try: | |
utils.execute(["ping", "-c", "1", | |
cfg.CONF.L3HEALTHCHECK.plugin_host]) | |
self.last_plugin_ping = timeutils.utcnow() | |
except: | |
raise | |
def exec_post_script(self): | |
if not cfg.CONF.L3HEALTHCHECK.post_script: | |
return | |
try: | |
utils.execute([cfg.CONF.L3HEALTHCHECK.post_script]) | |
except: | |
raise | |
def _destroy_router_namespaces(self): | |
root_ip = ip_lib.IPWrapper(self.root_helper) | |
for ns in root_ip.get_namespaces(self.root_helper): | |
if ns.startswith(NS_PREFIX): | |
try: | |
self._destroy_router_namespace(ns) | |
except Exception: | |
LOG.exception(_("Failed deleting namespace '%s'"), ns) | |
def _destroy_router_namespace(self, namespace): | |
ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=namespace) | |
for d in ns_ip.get_devices(exclude_loopback=True): | |
if d.name.startswith(INTERNAL_DEV_PREFIX): | |
# device is on default bridge | |
self.driver.unplug(d.name, namespace=namespace, | |
prefix=INTERNAL_DEV_PREFIX) | |
elif d.name.startswith(EXTERNAL_DEV_PREFIX): | |
self.driver.unplug(d.name, | |
bridge=self.conf.external_network_bridge, | |
namespace=namespace, | |
prefix=EXTERNAL_DEV_PREFIX) | |
def healthcheck(self, context): | |
tm = timeutils.utcnow() - self.last_plugin_ping | |
if tm.seconds > cfg.CONF.L3HEALTHCHECK.isolated_period: | |
self._destroy_router_namespaces() | |
agents = self.get_agents(self.context) | |
for agent in agents: | |
# if first run where l3 agent admin_state_up=false and agent UP | |
if (agent['agent_type'] == constants.AGENT_TYPE_L3 and agent['admin_state_up']): | |
LOG.info(agent['heartbeat_timestamp']) | |
LOG.info(agent['admin_state_up']) | |
if (not self.agents.get(agent['id']) and | |
agent['agent_type'] == constants.AGENT_TYPE_L3 and | |
not agents_db.AgentDbMixin.is_agent_down( | |
agent['heartbeat_timestamp']) and | |
not agent['admin_state_up']): | |
self.agents[agent['id']] = timeutils.utcnow() | |
if (agent['agent_type'] == constants.AGENT_TYPE_L3 and | |
not agent['admin_state_up'] and | |
not agents_db.AgentDbMixin.is_agent_down( | |
agent['heartbeat_timestamp']) and | |
self.agents.get(agent['id'])): | |
tm = timeutils.utcnow() - self.agents[agent['id']] | |
if tm.seconds > cfg.CONF.L3HEALTHCHECK.validity_period: | |
self.update_agent(self.context, agent['id'], | |
{'agent': {'admin_state_up': True}}) | |
LOG.info("Change admin_state_up to True for agent: %s" % agent['id']) | |
l3_rpc.L3AgentNotify.agent_updated(self.context, | |
True, agent['host']) | |
del self.agents[agent['id']] | |
if (agent['agent_type'] == constants.AGENT_TYPE_L3 and | |
agent['admin_state_up']): | |
if agents_db.AgentDbMixin.is_agent_down( | |
agent['heartbeat_timestamp']): | |
LOG.info(agents_db.AgentDbMixin.is_agent_down( | |
agent['heartbeat_timestamp'])) | |
# check if agent flapping | |
if self.host == agent['host']: | |
LOG.error("Can not migrate from myself, check l3 agent status!") | |
continue | |
LOG.error(_("Start migration of the agent %s"), | |
agent['host']) | |
lck = self.lock(context) | |
if not lck: | |
continue | |
self.update_agent(self.context, agent['id'], | |
{'agent': {'admin_state_up': False}}) | |
self.agents[agent['id']] = timeutils.utcnow() | |
routers = self.list_routers_on_l3_agent(self.context, | |
agent['id']) | |
router_ids = [] | |
for router in routers['routers']: | |
LOG.debug("Reschedule router %s from agent %s", | |
router['id'], agent['id']) | |
self.schedule(self, self.context, router['id']) | |
LOG.debug("Router %s rescheduled", router['id']) | |
self.remove_router_from_l3_agent(self.context, | |
agent['id'], | |
router['id']) | |
router_ids.append(router['id']) | |
LOG.debug(_("Notify all l3 agent for router migrations")) | |
self.l3_notifier.routers_updated(self.context, | |
router_ids) | |
l3_rpc.L3AgentNotify.agent_updated(self.context, | |
True, agent['host']) | |
self.exec_post_script() | |
self.unlock(context) | |
# if router not hosting on any l3 agent | |
for router in self.get_routers(self.context): | |
if not self.list_l3_agents_hosting_router(self.context,router['id'])['agents']: | |
lck = self.lock(context) | |
if not lck: | |
continue | |
LOG.info('Migrate not hosting router: %s' % router['id']) | |
router_ids = [] | |
LOG.debug("Reschedule router %s ", router['id']) | |
self.schedule(self, self.context, router['id']) | |
LOG.debug("Router %s rescheduled", router['id']) | |
router_ids.append(router['id']) | |
LOG.debug(_("Notify all l3 agent for router migrations")) | |
self.l3_notifier.routers_updated(self.context, | |
router_ids) | |
l3_rpc.L3AgentNotify.agent_updated(self.context, | |
True, agent['host']) | |
self.exec_post_script() | |
self.unlock(context) | |
class L3healthcheckManager(periodic_task.PeriodicTasks): | |
def __init__(self, host): | |
self.l3_hc_agent = L3HealthcheckAgent(host) | |
@periodic_task.periodic_task | |
def healthcheck(self, context): | |
LOG.info("Healthcheck - healthcheck") | |
self.l3_hc_agent.healthcheck(context) | |
@periodic_task.periodic_task | |
def purge_locks(self, context): | |
LOG.info("Healthcheck - purge_locks") | |
self.l3_hc_agent.purge_locks(context) | |
@periodic_task.periodic_task | |
def check_isolated(self, context): | |
LOG.info("Healthcheck - check_isolated") | |
self.l3_hc_agent.check_isolated(context) | |
class L3healthcheck(service.Service): | |
OPTS = [ | |
cfg.StrOpt( | |
'post_script', | |
help='Script executed after rescheduling'), | |
cfg.StrOpt( | |
'plugin_host', | |
default=30, | |
help='Address of the plugin host'), | |
cfg.IntOpt( | |
'isolated_period', | |
default=30, | |
help='Seconds before an isolated agent removes routers'), | |
cfg.IntOpt( | |
'check_interval', | |
default=10, | |
help='Seconds between periodic check'), | |
cfg.IntOpt( | |
'lock_validity', | |
default=5, | |
help='Seconds of the lock validity'), | |
cfg.IntOpt( | |
'validity_period', | |
default=60, | |
help='Reset admin_state_up after validity period'), | |
] | |
def __init__(self, host): | |
super(L3healthcheck, self).__init__() | |
self.manager = L3healthcheckManager(host) | |
def start(self): | |
super(L3healthcheck, self).start() | |
self.tg.add_timer( | |
cfg.CONF.L3HEALTHCHECK.check_interval, | |
self.manager.run_periodic_tasks, | |
None, | |
None | |
) | |
def main(): | |
LOG.debug("Starting....") | |
eventlet.monkey_patch() | |
server = L3healthcheck(socket.gethostname()) | |
service.launch(server).wait() | |
if __name__ == "__main__": | |
product_name = "l3healthcheck" | |
cfg.CONF(project=product_name) | |
cfg.CONF.register_opts(L3healthcheck.OPTS, "L3HEALTHCHECK") | |
cfg.CONF.register_opts(l3_agent.L3NATAgent.OPTS) | |
cfg.CONF.register_opts(interface.OPTS) | |
cfg.CONF.register_opts(external_process.OPTS) | |
config.register_root_helper(cfg.CONF) | |
config.register_interface_driver_opts_helper(cfg.CONF) | |
logging.setup(product_name) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment