Skip to content

Instantly share code, notes, and snippets.

@saamich
Created July 8, 2015 19:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saamich/03ddd11361992691512f to your computer and use it in GitHub Desktop.
Save saamich/03ddd11361992691512f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import eventlet
import socket
import sqlalchemy as sa
from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent import l3_agent
from neutron.agent.linux import external_process
from neutron.agent.linux import interface
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api as l3_rpc
from neutron.common import constants
from neutron.common import topics
from neutron import context as n_context
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_db
from neutron.db import model_base
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common import periodic_task
from neutron.openstack.common.rpc import proxy
from neutron.openstack.common import service
from neutron.openstack.common import timeutils
from neutron.scheduler import l3_agent_scheduler
LOG = logging.getLogger(__name__)
HEALTHCHECK_LOCK = "l3-healthcheck"
NS_PREFIX = 'qrouter-'
INTERNAL_DEV_PREFIX = 'qr-'
EXTERNAL_DEV_PREFIX = 'qg-'
class L3HealthcheckLock(model_base.BASEV2):
"""Represents a lock."""
lock_id = sa.Column(sa.String(16), primary_key=True)
valid_timestamp = sa.Column(sa.DateTime, nullable=False)
host = sa.Column(sa.String(48), nullable=False)
class L3AgentNotify(proxy.RpcProxy):
"""API for plugin to notify L3 agent."""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.L3_AGENT):
super(L3AgentNotify, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def _notification(self, context, method, router_ids):
"""Notify all the agents that are hosting the routers."""
self.fanout_cast(
context, self.make_msg(method,
routers=router_ids),
topic=topics.L3_AGENT)
def routers_updated(self, context, router_ids):
if router_ids:
self._notification(context, 'routers_updated', router_ids)
class L3HealthcheckAgent(db_base_plugin_v2.NeutronDbPluginV2,
l3_agent_scheduler.ChanceScheduler,
l3_db.L3_NAT_db_mixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin):
def __init__(self, host):
db_api.configure_db()
self.l3_notifier = L3AgentNotify()
self.session = db_api.get_session()
self.host = host
self.conf = cfg.CONF
self.root_helper = config.get_root_helper(self.conf)
self.agents = {}
self.context = n_context.get_admin_context()
self.last_plugin_ping = timeutils.utcnow()
if not self.conf.interface_driver:
raise SystemExit(_('An interface driver must be specified'))
try:
self.driver = importutils.import_object(
self.conf.interface_driver,
self.conf
)
except Exception:
msg = _("Error importing interface driver "
"'%s'") % self.conf.interface_driver
raise SystemExit(msg)
def lock(self, context):
try:
with self.session.begin(subtransactions=True):
hc_lock = L3HealthcheckLock(lock_id=HEALTHCHECK_LOCK,
valid_timestamp=timeutils.utcnow(),
host=self.host)
self.session.add(hc_lock)
except:
LOG.warning(_("Unable to get a lock, migration in progress"))
return False
return True
def unlock(self, context):
with self.session.begin(subtransactions=True):
query = self.session.query(L3HealthcheckLock)
hc_lock = query.first()
if hc_lock:
self.session.delete(hc_lock)
def purge_locks(self, context):
delta = datetime.timedelta(
seconds=cfg.CONF.L3HEALTHCHECK.lock_validity)
tm = timeutils.utcnow() - delta
with self.session.begin(subtransactions=True):
query = self.session.query(L3HealthcheckLock)
query = query.filter(L3HealthcheckLock.valid_timestamp < tm)
hc_lock = query.first()
if hc_lock:
self.session.delete(hc_lock)
def check_isolated(self, context):
try:
utils.execute(["ping", "-c", "1",
cfg.CONF.L3HEALTHCHECK.plugin_host])
self.last_plugin_ping = timeutils.utcnow()
except:
raise
def exec_post_script(self):
if not cfg.CONF.L3HEALTHCHECK.post_script:
return
try:
utils.execute([cfg.CONF.L3HEALTHCHECK.post_script])
except:
raise
def _destroy_router_namespaces(self):
root_ip = ip_lib.IPWrapper(self.root_helper)
for ns in root_ip.get_namespaces(self.root_helper):
if ns.startswith(NS_PREFIX):
try:
self._destroy_router_namespace(ns)
except Exception:
LOG.exception(_("Failed deleting namespace '%s'"), ns)
def _destroy_router_namespace(self, namespace):
ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=namespace)
for d in ns_ip.get_devices(exclude_loopback=True):
if d.name.startswith(INTERNAL_DEV_PREFIX):
# device is on default bridge
self.driver.unplug(d.name, namespace=namespace,
prefix=INTERNAL_DEV_PREFIX)
elif d.name.startswith(EXTERNAL_DEV_PREFIX):
self.driver.unplug(d.name,
bridge=self.conf.external_network_bridge,
namespace=namespace,
prefix=EXTERNAL_DEV_PREFIX)
def healthcheck(self, context):
tm = timeutils.utcnow() - self.last_plugin_ping
if tm.seconds > cfg.CONF.L3HEALTHCHECK.isolated_period:
self._destroy_router_namespaces()
agents = self.get_agents(self.context)
for agent in agents:
# if first run where l3 agent admin_state_up=false and agent UP
if (agent['agent_type'] == constants.AGENT_TYPE_L3 and agent['admin_state_up']):
LOG.info(agent['heartbeat_timestamp'])
LOG.info(agent['admin_state_up'])
if (not self.agents.get(agent['id']) and
agent['agent_type'] == constants.AGENT_TYPE_L3 and
not agents_db.AgentDbMixin.is_agent_down(
agent['heartbeat_timestamp']) and
not agent['admin_state_up']):
self.agents[agent['id']] = timeutils.utcnow()
if (agent['agent_type'] == constants.AGENT_TYPE_L3 and
not agent['admin_state_up'] and
not agents_db.AgentDbMixin.is_agent_down(
agent['heartbeat_timestamp']) and
self.agents.get(agent['id'])):
tm = timeutils.utcnow() - self.agents[agent['id']]
if tm.seconds > cfg.CONF.L3HEALTHCHECK.validity_period:
self.update_agent(self.context, agent['id'],
{'agent': {'admin_state_up': True}})
LOG.info("Change admin_state_up to True for agent: %s" % agent['id'])
l3_rpc.L3AgentNotify.agent_updated(self.context,
True, agent['host'])
del self.agents[agent['id']]
if (agent['agent_type'] == constants.AGENT_TYPE_L3 and
agent['admin_state_up']):
if agents_db.AgentDbMixin.is_agent_down(
agent['heartbeat_timestamp']):
LOG.info(agents_db.AgentDbMixin.is_agent_down(
agent['heartbeat_timestamp']))
# check if agent flapping
if self.host == agent['host']:
LOG.error("Can not migrate from myself, check l3 agent status!")
continue
LOG.error(_("Start migration of the agent %s"),
agent['host'])
lck = self.lock(context)
if not lck:
continue
self.update_agent(self.context, agent['id'],
{'agent': {'admin_state_up': False}})
self.agents[agent['id']] = timeutils.utcnow()
routers = self.list_routers_on_l3_agent(self.context,
agent['id'])
router_ids = []
for router in routers['routers']:
LOG.debug("Reschedule router %s from agent %s",
router['id'], agent['id'])
self.schedule(self, self.context, router['id'])
LOG.debug("Router %s rescheduled", router['id'])
self.remove_router_from_l3_agent(self.context,
agent['id'],
router['id'])
router_ids.append(router['id'])
LOG.debug(_("Notify all l3 agent for router migrations"))
self.l3_notifier.routers_updated(self.context,
router_ids)
l3_rpc.L3AgentNotify.agent_updated(self.context,
True, agent['host'])
self.exec_post_script()
self.unlock(context)
# if router not hosting on any l3 agent
for router in self.get_routers(self.context):
if not self.list_l3_agents_hosting_router(self.context,router['id'])['agents']:
lck = self.lock(context)
if not lck:
continue
LOG.info('Migrate not hosting router: %s' % router['id'])
router_ids = []
LOG.debug("Reschedule router %s ", router['id'])
self.schedule(self, self.context, router['id'])
LOG.debug("Router %s rescheduled", router['id'])
router_ids.append(router['id'])
LOG.debug(_("Notify all l3 agent for router migrations"))
self.l3_notifier.routers_updated(self.context,
router_ids)
l3_rpc.L3AgentNotify.agent_updated(self.context,
True, agent['host'])
self.exec_post_script()
self.unlock(context)
class L3healthcheckManager(periodic_task.PeriodicTasks):
def __init__(self, host):
self.l3_hc_agent = L3HealthcheckAgent(host)
@periodic_task.periodic_task
def healthcheck(self, context):
LOG.info("Healthcheck - healthcheck")
self.l3_hc_agent.healthcheck(context)
@periodic_task.periodic_task
def purge_locks(self, context):
LOG.info("Healthcheck - purge_locks")
self.l3_hc_agent.purge_locks(context)
@periodic_task.periodic_task
def check_isolated(self, context):
LOG.info("Healthcheck - check_isolated")
self.l3_hc_agent.check_isolated(context)
class L3healthcheck(service.Service):
OPTS = [
cfg.StrOpt(
'post_script',
help='Script executed after rescheduling'),
cfg.StrOpt(
'plugin_host',
default=30,
help='Address of the plugin host'),
cfg.IntOpt(
'isolated_period',
default=30,
help='Seconds before an isolated agent removes routers'),
cfg.IntOpt(
'check_interval',
default=10,
help='Seconds between periodic check'),
cfg.IntOpt(
'lock_validity',
default=5,
help='Seconds of the lock validity'),
cfg.IntOpt(
'validity_period',
default=60,
help='Reset admin_state_up after validity period'),
]
def __init__(self, host):
super(L3healthcheck, self).__init__()
self.manager = L3healthcheckManager(host)
def start(self):
super(L3healthcheck, self).start()
self.tg.add_timer(
cfg.CONF.L3HEALTHCHECK.check_interval,
self.manager.run_periodic_tasks,
None,
None
)
def main():
LOG.debug("Starting....")
eventlet.monkey_patch()
server = L3healthcheck(socket.gethostname())
service.launch(server).wait()
if __name__ == "__main__":
product_name = "l3healthcheck"
cfg.CONF(project=product_name)
cfg.CONF.register_opts(L3healthcheck.OPTS, "L3HEALTHCHECK")
cfg.CONF.register_opts(l3_agent.L3NATAgent.OPTS)
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF.register_opts(external_process.OPTS)
config.register_root_helper(cfg.CONF)
config.register_interface_driver_opts_helper(cfg.CONF)
logging.setup(product_name)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment