Skip to content

Instantly share code, notes, and snippets.

@rodrigogansobarbieri
Last active February 10, 2021 17:51
Show Gist options
  • Save rodrigogansobarbieri/bb7d6c6e0ccfc26191ef2e2e27265653 to your computer and use it in GitHub Desktop.
Save rodrigogansobarbieri/bb7d6c6e0ccfc26191ef2e2e27265653 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Evacuation assistance script to assist in finding destinations for
evacuating instances that are in affinity or anti-affinity groups,
as a workaround for bug https://bugs.launchpad.net/bugs/1821755.
Authors:
- rodrigo.barbieri@canonical.com
Latest Code:
- https://gist.github.com/rodrigogansobarbieri/bb7d6c6e0ccfc26191ef2e2e27265653
Unit tests:
- https://gist.github.com/rodrigogansobarbieri/9e40d5746cd18e4c06a3c1cc7aece287
"""
import argparse
import json
import logging
import os
import subprocess
import sys
LOG = logging.getLogger(__name__)
def parse_args(params):
"""Parse arguments from CLI."""
parser = argparse.ArgumentParser()
# Arguments
parser.add_argument('--host', type=str, required=True,
help="Host being evacuated.")
parser.add_argument('--loglevel', choices=['INFO', 'DEBUG'],
default='INFO', required=False,
help="Set the log level. Default: 'INFO'.")
arguments = parser.parse_args(params)
logging.basicConfig(level=arguments.loglevel,
format='%(levelname)s: %(message)s')
LOG.debug("Running with arguments: {}".format(arguments.__dict__))
return arguments
def run_cmd(cmd):
"""Helper for running commands and parsing the output."""
LOG.debug("Running command: {}".format(cmd))
pipes = subprocess.Popen(
cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = pipes.communicate()
stdout = (stdout.decode(sys.stdout.encoding) if stdout not in (None, '')
else stdout)
stderr = (stderr.decode(sys.stdout.encoding) if stderr not in (None, '')
else stderr)
msg = ("Result of command {}\n"
"return code: {}\n"
"stdout: {}\n"
"stderr: {}".format(cmd, pipes.returncode, stdout, stderr))
if pipes.returncode != 0:
raise CommandFailedException(msg)
LOG.debug(msg)
return stdout
class AffinityDestinationConflict(Exception):
"""Specific exception for handling affinity destination conflicts."""
class CommandFailedException(Exception):
"""Specific exception for handling raised exceptions on command failure."""
class EvacuationAssistant(object):
"""Analyzes data and presents commands for suggested migrations."""
def __init__(self, params):
"""Initialize variables."""
self.cfg = params
self.evac_host = self.cfg.host
self.dest_map = {}
self.in_progress = {}
self.groups_json = {}
self.vms_json = {}
self.hosts_json = {}
self.flavors_json = {}
self.res_prov_json = {}
self.vms_indexed = {}
self.flavors_indexed = {}
self.res_prov_indexed = {}
self.hosts_indexed = {}
def gather_info(self):
"""Gather initial data."""
# Validate if dependencies for "openstack resource provider" commands
# are installed
error = False
res_prov_string = None
try:
res_prov_string = run_cmd(
"openstack resource provider list --format json")
except CommandFailedException:
error = True
else:
if "not an openstack command" in res_prov_string:
error = True
if error:
LOG.error("Make sure to install the required dependencies for "
"using Placement commands in OpenStackClient.")
exit(1)
self.res_prov_json = json.loads(res_prov_string)
self.res_prov_indexed = {x['name']: x for x in self.res_prov_json}
if self.evac_host not in self.res_prov_indexed.keys():
LOG.error("Host to evacuate not found.")
exit(1)
groups_string = run_cmd(
"openstack server group list --all-projects --long --format json")
self.groups_json = json.loads(groups_string)
vms_string = run_cmd(
"openstack server list --all-projects --long --format json")
self.vms_json = json.loads(vms_string)
self.vms_indexed = {x['ID']: x for x in self.vms_json}
hosts_string = run_cmd(
"openstack hypervisor list --long --format json")
self.hosts_json = json.loads(hosts_string)
# Filter out down hosts and the host being
# evacuated as valid destinations
self.hosts_indexed = {x['Hypervisor Hostname']: x for x in
self.hosts_json if x['State'] == 'up' and
x['Hypervisor Hostname'] != self.evac_host}
flavors_string = run_cmd(
"openstack flavor list --all --long --format json")
self.flavors_json = json.loads(flavors_string)
self.flavors_indexed = {x['ID']: x for x in self.flavors_json}
def calculate_destinations(self):
"""Calculate all possibilities based on (anti-)affinity policies."""
dest_map = {}
for vm in self.vms_json:
# We only care about VMs to be evacuated
if vm['Host'] != self.evac_host:
continue
anti_vms = []
aff_vms = []
anti_hosts = []
aff_hosts = []
# For each VM, we will build a list of
# affinity and anti-affinity VMs
for group in self.groups_json:
if vm['ID'] in group['Members']:
if group['Policies'] == 'anti-affinity':
anti_vms += [x.strip() for x
in group['Members'].split(',')
if x.strip() != vm['ID']]
if group['Policies'] == 'affinity':
aff_vms += [x.strip() for x
in group['Members'].split(',')
if x.strip() != vm['ID']]
# Remove duplicates
anti_vms = list(set(anti_vms))
aff_vms = list(set(aff_vms))
anti_vms.sort()
aff_vms.sort()
# And based on (anti-)affinity, build a list of host candidates
for antivm_id in anti_vms:
anti_hosts.append(self.vms_indexed[antivm_id]['Host'])
for affvm_id in aff_vms:
# A VM with affinity may already have been migrated
if self.vms_indexed[affvm_id]['Host'] != self.evac_host:
aff_hosts.append(self.vms_indexed[affvm_id]['Host'])
# Remove duplicates
anti_hosts = list(set(anti_hosts))
aff_hosts = list(set(aff_hosts))
# Error condition if 2 VMs with affinities are in different hosts
# where neither is the host being evacuated
if len(aff_hosts) > 1:
raise AffinityDestinationConflict(
"VM {} has affinity with 2 VMs in different "
"hosts: {}".format(vm['ID'], aff_hosts))
# Destination mapping info
dest_map[vm['ID']] = {
'anti_vms': anti_vms,
'aff_vms': aff_vms,
}
# The only candidates for hosts with affinity are hosts where a VM
# would have already migrated
if len(aff_hosts) > 0:
candidate_hosts = aff_hosts
else:
# Otherwise any host where not restricted by anti-affinity
candidate_hosts = [x for x in self.hosts_indexed.keys()
if x not in anti_hosts]
dest_map[vm['ID']]['candidate_hosts'] = candidate_hosts
# Intersect hosts of members within the same
# group for VMs with affinity
for vm_id, dest_data in dest_map.items():
for aff_vm in dest_data['aff_vms']:
aff_vm_data = dest_map.get(aff_vm)
if aff_vm_data:
dest_data['candidate_hosts'] = list(
set(dest_data['candidate_hosts']) &
set(aff_vm_data['candidate_hosts']))
dest_data['candidate_hosts'].sort()
LOG.info("Base data: {}".format(
json.dumps(dest_map, sort_keys=True, indent=2)))
return dest_map
def select_host(self, vm_id):
"""Perform selection of specific destinations for each VM."""
selected_hosts = []
dest_data = self.dest_map[vm_id]
# Currently a VM cannot be in both anti-affinity and affinity groups
if dest_data['aff_vms']:
for aff_vm in dest_data['aff_vms']:
if self.in_progress.get(aff_vm):
# If there is a migration in progress, we assume all VMs
# with affinity are migrating to the same host
return self.in_progress[aff_vm]
if dest_data['anti_vms']:
for anti_vm in dest_data['anti_vms']:
if self.in_progress.get(anti_vm):
# If there is a migration is in progress, then
# that host is no longer a candidate. This only happens if
# the anti-affinity policy has already been violated
dest_data['candidate_hosts'].remove(
self.in_progress[anti_vm])
selected_hosts += dest_data['candidate_hosts']
selected_hosts = self.filter_hosts(selected_hosts, vm_id)
return selected_hosts[0] if selected_hosts else None
def calculate_initial_remaining_resources(self, host):
"""Obtain resource data to calculate initial available resources."""
host_data = self.hosts_indexed[host]
# Allocation ratio data is in the inventory
inventory_string = run_cmd(
"openstack resource provider inventory list {} "
"--format json".format(self.res_prov_indexed[host]['uuid']))
# disk_available_least data is in hypervisor details
hypervisor_data_string = run_cmd("openstack hypervisor show {}"
" --format json".format(host))
host_data['Inventory'] = json.loads(inventory_string)
host_data['Details'] = json.loads(hypervisor_data_string)
mem_alloc_ratio = next(
x['allocation_ratio'] for x in host_data['Inventory']
if x['resource_class'] == "MEMORY_MB")
vcpu_alloc_ratio = next(
x['allocation_ratio'] for x in host_data['Inventory']
if x['resource_class'] == "VCPU")
# Remaining "virtual" amount includes all
# resources that can be overcommitted
host_data["Memory MB Remaining"] = (
(host_data["Memory MB"] * mem_alloc_ratio) -
host_data["Memory MB Used"]
)
host_data["vCPUs Remaining"] = (
(host_data["vCPUs"] * vcpu_alloc_ratio) -
host_data["vCPUs Used"]
)
# disk_available_least value determines whether
# block migrations can be successful
host_data["Disk Remaining"] = host_data['Details'][
'disk_available_least']
def adjust_remaining_resources(self, vm_id, host):
"""Readjust remaining values after a successful migration request."""
vm_data = self.vms_indexed[vm_id]
vm_flavor = self.flavors_indexed[vm_data['Flavor ID']]
host_data = self.hosts_indexed[host]
host_data["Memory MB Remaining"] = (host_data["Memory MB Remaining"] -
vm_flavor['RAM'])
host_data["vCPUs Remaining"] = (host_data["vCPUs Remaining"] -
vm_flavor['VCPUs'])
# volume backed instances are not validated
# against disk_available_least
if not self.is_volume_backed(vm_id):
host_data["Disk Remaining"] = (
host_data["Disk Remaining"] - vm_flavor['Disk'] -
vm_flavor['Ephemeral'])
def filter_hosts(self, selected_hosts, vm_id):
"""Filters hosts based on resources."""
vm_data = self.vms_indexed[vm_id]
vm_flavor = self.flavors_indexed[vm_data['Flavor ID']]
filtered_hosts = []
for host in selected_hosts:
host_data = self.hosts_indexed[host]
# Any host with less than the required VCPU,
# RAM or DISK is filtered
if (vm_flavor['RAM'] > host_data["Memory MB Remaining"] or
vm_flavor['VCPUs'] > host_data["vCPUs Remaining"] or
(not self.is_volume_backed(vm_id) and ((
vm_flavor['Disk'] + vm_flavor['Ephemeral']) >
host_data["Disk Remaining"]))):
continue
filtered_hosts.append(host)
return filtered_hosts
def build_cmd(self, host, vm_id):
"""Build command to perform migration."""
dest_data = self.dest_map[vm_id]
cmd = ("openstack server migrate {}".format(vm_id))
if not self.is_volume_backed(vm_id):
cmd += " --block-migration"
if dest_data['aff_vms']:
# In case of VMs in an affinity group,
# we need to bypass the scheduler
return cmd + " --live {}".format(host)
else:
return cmd + (
" --live-migration --host {} "
"--os-compute-api-version 2.30".format(host)
)
def is_volume_backed(self, vm_id):
"""Checks if VM is booted from volume."""
return ("booted from volume" in
self.vms_indexed[vm_id]['Image ID'].lower())
def evacuate(self):
"""Evacuate all VMs from host."""
commands = []
failed = []
for host in self.hosts_indexed.keys():
self.calculate_initial_remaining_resources(host)
for vm_id in self.dest_map.keys():
LOG.debug("Starting analysis for VM {}".format(vm_id))
# Select a host considering other running migrations and resources
host = self.select_host(vm_id)
LOG.debug("Selected possible host for VM {} : {}".format(
vm_id, host))
if not host:
# We may not be able to find a proper destination
# for a given instance
failed.append(vm_id)
continue
cmd = self.build_cmd(host, vm_id)
LOG.debug(
"Choosing to migrate VM {} to host {}".format(vm_id, host))
commands.append(cmd)
# Register the VM and its destination,
# and readjust remaining resources
self.in_progress[vm_id] = host
self.adjust_remaining_resources(vm_id, host)
LOG.info("Migrations suggested: {}".format(
json.dumps(self.in_progress, sort_keys=True, indent=2)))
if failed:
LOG.error("Failed to find a destination host for the "
"following VMs: \n{}".format('\n'.join(failed)))
LOG.info("Commands: \n{}".format('\n'.join(commands)))
@staticmethod
def check_dependencies():
"""Check for dependencies required to run the rest of the script."""
if 'OS_AUTH_URL' not in os.environ:
LOG.error("Please source your credentials first.")
exit(1)
def run(self):
"""Main logic"""
self.check_dependencies()
self.gather_info()
self.dest_map = self.calculate_destinations()
LOG.debug("Hosts data before migrations: {}".format(
json.dumps(self.hosts_indexed, sort_keys=True, indent=2)))
self.evacuate()
LOG.debug("Hosts data after migrations: {}".format(
json.dumps(self.hosts_indexed, sort_keys=True, indent=2)))
if __name__ == '__main__':
args = parse_args(sys.argv[1:])
main = EvacuationAssistant(args)
main.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment