Last active
February 10, 2021 17:51
-
-
Save rodrigogansobarbieri/bb7d6c6e0ccfc26191ef2e2e27265653 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Evacuation assistance script to assist in finding destinations for | |
evacuating instances that are in affinity or anti-affinity groups, | |
as a workaround for bug https://bugs.launchpad.net/bugs/1821755. | |
Authors: | |
- rodrigo.barbieri@canonical.com | |
Latest Code: | |
- https://gist.github.com/rodrigogansobarbieri/bb7d6c6e0ccfc26191ef2e2e27265653 | |
Unit tests: | |
- https://gist.github.com/rodrigogansobarbieri/9e40d5746cd18e4c06a3c1cc7aece287 | |
""" | |
import argparse | |
import json | |
import logging | |
import os | |
import subprocess | |
import sys | |
LOG = logging.getLogger(__name__) | |
def parse_args(params): | |
"""Parse arguments from CLI.""" | |
parser = argparse.ArgumentParser() | |
# Arguments | |
parser.add_argument('--host', type=str, required=True, | |
help="Host being evacuated.") | |
parser.add_argument('--loglevel', choices=['INFO', 'DEBUG'], | |
default='INFO', required=False, | |
help="Set the log level. Default: 'INFO'.") | |
arguments = parser.parse_args(params) | |
logging.basicConfig(level=arguments.loglevel, | |
format='%(levelname)s: %(message)s') | |
LOG.debug("Running with arguments: {}".format(arguments.__dict__)) | |
return arguments | |
def run_cmd(cmd): | |
"""Helper for running commands and parsing the output.""" | |
LOG.debug("Running command: {}".format(cmd)) | |
pipes = subprocess.Popen( | |
cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
stdout, stderr = pipes.communicate() | |
stdout = (stdout.decode(sys.stdout.encoding) if stdout not in (None, '') | |
else stdout) | |
stderr = (stderr.decode(sys.stdout.encoding) if stderr not in (None, '') | |
else stderr) | |
msg = ("Result of command {}\n" | |
"return code: {}\n" | |
"stdout: {}\n" | |
"stderr: {}".format(cmd, pipes.returncode, stdout, stderr)) | |
if pipes.returncode != 0: | |
raise CommandFailedException(msg) | |
LOG.debug(msg) | |
return stdout | |
class AffinityDestinationConflict(Exception): | |
"""Specific exception for handling affinity destination conflicts.""" | |
class CommandFailedException(Exception): | |
"""Specific exception for handling raised exceptions on command failure.""" | |
class EvacuationAssistant(object): | |
"""Analyzes data and presents commands for suggested migrations.""" | |
def __init__(self, params): | |
"""Initialize variables.""" | |
self.cfg = params | |
self.evac_host = self.cfg.host | |
self.dest_map = {} | |
self.in_progress = {} | |
self.groups_json = {} | |
self.vms_json = {} | |
self.hosts_json = {} | |
self.flavors_json = {} | |
self.res_prov_json = {} | |
self.vms_indexed = {} | |
self.flavors_indexed = {} | |
self.res_prov_indexed = {} | |
self.hosts_indexed = {} | |
def gather_info(self): | |
"""Gather initial data.""" | |
# Validate if dependencies for "openstack resource provider" commands | |
# are installed | |
error = False | |
res_prov_string = None | |
try: | |
res_prov_string = run_cmd( | |
"openstack resource provider list --format json") | |
except CommandFailedException: | |
error = True | |
else: | |
if "not an openstack command" in res_prov_string: | |
error = True | |
if error: | |
LOG.error("Make sure to install the required dependencies for " | |
"using Placement commands in OpenStackClient.") | |
exit(1) | |
self.res_prov_json = json.loads(res_prov_string) | |
self.res_prov_indexed = {x['name']: x for x in self.res_prov_json} | |
if self.evac_host not in self.res_prov_indexed.keys(): | |
LOG.error("Host to evacuate not found.") | |
exit(1) | |
groups_string = run_cmd( | |
"openstack server group list --all-projects --long --format json") | |
self.groups_json = json.loads(groups_string) | |
vms_string = run_cmd( | |
"openstack server list --all-projects --long --format json") | |
self.vms_json = json.loads(vms_string) | |
self.vms_indexed = {x['ID']: x for x in self.vms_json} | |
hosts_string = run_cmd( | |
"openstack hypervisor list --long --format json") | |
self.hosts_json = json.loads(hosts_string) | |
# Filter out down hosts and the host being | |
# evacuated as valid destinations | |
self.hosts_indexed = {x['Hypervisor Hostname']: x for x in | |
self.hosts_json if x['State'] == 'up' and | |
x['Hypervisor Hostname'] != self.evac_host} | |
flavors_string = run_cmd( | |
"openstack flavor list --all --long --format json") | |
self.flavors_json = json.loads(flavors_string) | |
self.flavors_indexed = {x['ID']: x for x in self.flavors_json} | |
def calculate_destinations(self): | |
"""Calculate all possibilities based on (anti-)affinity policies.""" | |
dest_map = {} | |
for vm in self.vms_json: | |
# We only care about VMs to be evacuated | |
if vm['Host'] != self.evac_host: | |
continue | |
anti_vms = [] | |
aff_vms = [] | |
anti_hosts = [] | |
aff_hosts = [] | |
# For each VM, we will build a list of | |
# affinity and anti-affinity VMs | |
for group in self.groups_json: | |
if vm['ID'] in group['Members']: | |
if group['Policies'] == 'anti-affinity': | |
anti_vms += [x.strip() for x | |
in group['Members'].split(',') | |
if x.strip() != vm['ID']] | |
if group['Policies'] == 'affinity': | |
aff_vms += [x.strip() for x | |
in group['Members'].split(',') | |
if x.strip() != vm['ID']] | |
# Remove duplicates | |
anti_vms = list(set(anti_vms)) | |
aff_vms = list(set(aff_vms)) | |
anti_vms.sort() | |
aff_vms.sort() | |
# And based on (anti-)affinity, build a list of host candidates | |
for antivm_id in anti_vms: | |
anti_hosts.append(self.vms_indexed[antivm_id]['Host']) | |
for affvm_id in aff_vms: | |
# A VM with affinity may already have been migrated | |
if self.vms_indexed[affvm_id]['Host'] != self.evac_host: | |
aff_hosts.append(self.vms_indexed[affvm_id]['Host']) | |
# Remove duplicates | |
anti_hosts = list(set(anti_hosts)) | |
aff_hosts = list(set(aff_hosts)) | |
# Error condition if 2 VMs with affinities are in different hosts | |
# where neither is the host being evacuated | |
if len(aff_hosts) > 1: | |
raise AffinityDestinationConflict( | |
"VM {} has affinity with 2 VMs in different " | |
"hosts: {}".format(vm['ID'], aff_hosts)) | |
# Destination mapping info | |
dest_map[vm['ID']] = { | |
'anti_vms': anti_vms, | |
'aff_vms': aff_vms, | |
} | |
# The only candidates for hosts with affinity are hosts where a VM | |
# would have already migrated | |
if len(aff_hosts) > 0: | |
candidate_hosts = aff_hosts | |
else: | |
# Otherwise any host where not restricted by anti-affinity | |
candidate_hosts = [x for x in self.hosts_indexed.keys() | |
if x not in anti_hosts] | |
dest_map[vm['ID']]['candidate_hosts'] = candidate_hosts | |
# Intersect hosts of members within the same | |
# group for VMs with affinity | |
for vm_id, dest_data in dest_map.items(): | |
for aff_vm in dest_data['aff_vms']: | |
aff_vm_data = dest_map.get(aff_vm) | |
if aff_vm_data: | |
dest_data['candidate_hosts'] = list( | |
set(dest_data['candidate_hosts']) & | |
set(aff_vm_data['candidate_hosts'])) | |
dest_data['candidate_hosts'].sort() | |
LOG.info("Base data: {}".format( | |
json.dumps(dest_map, sort_keys=True, indent=2))) | |
return dest_map | |
def select_host(self, vm_id): | |
"""Perform selection of specific destinations for each VM.""" | |
selected_hosts = [] | |
dest_data = self.dest_map[vm_id] | |
# Currently a VM cannot be in both anti-affinity and affinity groups | |
if dest_data['aff_vms']: | |
for aff_vm in dest_data['aff_vms']: | |
if self.in_progress.get(aff_vm): | |
# If there is a migration in progress, we assume all VMs | |
# with affinity are migrating to the same host | |
return self.in_progress[aff_vm] | |
if dest_data['anti_vms']: | |
for anti_vm in dest_data['anti_vms']: | |
if self.in_progress.get(anti_vm): | |
# If there is a migration is in progress, then | |
# that host is no longer a candidate. This only happens if | |
# the anti-affinity policy has already been violated | |
dest_data['candidate_hosts'].remove( | |
self.in_progress[anti_vm]) | |
selected_hosts += dest_data['candidate_hosts'] | |
selected_hosts = self.filter_hosts(selected_hosts, vm_id) | |
return selected_hosts[0] if selected_hosts else None | |
def calculate_initial_remaining_resources(self, host): | |
"""Obtain resource data to calculate initial available resources.""" | |
host_data = self.hosts_indexed[host] | |
# Allocation ratio data is in the inventory | |
inventory_string = run_cmd( | |
"openstack resource provider inventory list {} " | |
"--format json".format(self.res_prov_indexed[host]['uuid'])) | |
# disk_available_least data is in hypervisor details | |
hypervisor_data_string = run_cmd("openstack hypervisor show {}" | |
" --format json".format(host)) | |
host_data['Inventory'] = json.loads(inventory_string) | |
host_data['Details'] = json.loads(hypervisor_data_string) | |
mem_alloc_ratio = next( | |
x['allocation_ratio'] for x in host_data['Inventory'] | |
if x['resource_class'] == "MEMORY_MB") | |
vcpu_alloc_ratio = next( | |
x['allocation_ratio'] for x in host_data['Inventory'] | |
if x['resource_class'] == "VCPU") | |
# Remaining "virtual" amount includes all | |
# resources that can be overcommitted | |
host_data["Memory MB Remaining"] = ( | |
(host_data["Memory MB"] * mem_alloc_ratio) - | |
host_data["Memory MB Used"] | |
) | |
host_data["vCPUs Remaining"] = ( | |
(host_data["vCPUs"] * vcpu_alloc_ratio) - | |
host_data["vCPUs Used"] | |
) | |
# disk_available_least value determines whether | |
# block migrations can be successful | |
host_data["Disk Remaining"] = host_data['Details'][ | |
'disk_available_least'] | |
def adjust_remaining_resources(self, vm_id, host): | |
"""Readjust remaining values after a successful migration request.""" | |
vm_data = self.vms_indexed[vm_id] | |
vm_flavor = self.flavors_indexed[vm_data['Flavor ID']] | |
host_data = self.hosts_indexed[host] | |
host_data["Memory MB Remaining"] = (host_data["Memory MB Remaining"] - | |
vm_flavor['RAM']) | |
host_data["vCPUs Remaining"] = (host_data["vCPUs Remaining"] - | |
vm_flavor['VCPUs']) | |
# volume backed instances are not validated | |
# against disk_available_least | |
if not self.is_volume_backed(vm_id): | |
host_data["Disk Remaining"] = ( | |
host_data["Disk Remaining"] - vm_flavor['Disk'] - | |
vm_flavor['Ephemeral']) | |
def filter_hosts(self, selected_hosts, vm_id): | |
"""Filters hosts based on resources.""" | |
vm_data = self.vms_indexed[vm_id] | |
vm_flavor = self.flavors_indexed[vm_data['Flavor ID']] | |
filtered_hosts = [] | |
for host in selected_hosts: | |
host_data = self.hosts_indexed[host] | |
# Any host with less than the required VCPU, | |
# RAM or DISK is filtered | |
if (vm_flavor['RAM'] > host_data["Memory MB Remaining"] or | |
vm_flavor['VCPUs'] > host_data["vCPUs Remaining"] or | |
(not self.is_volume_backed(vm_id) and (( | |
vm_flavor['Disk'] + vm_flavor['Ephemeral']) > | |
host_data["Disk Remaining"]))): | |
continue | |
filtered_hosts.append(host) | |
return filtered_hosts | |
def build_cmd(self, host, vm_id): | |
"""Build command to perform migration.""" | |
dest_data = self.dest_map[vm_id] | |
cmd = ("openstack server migrate {}".format(vm_id)) | |
if not self.is_volume_backed(vm_id): | |
cmd += " --block-migration" | |
if dest_data['aff_vms']: | |
# In case of VMs in an affinity group, | |
# we need to bypass the scheduler | |
return cmd + " --live {}".format(host) | |
else: | |
return cmd + ( | |
" --live-migration --host {} " | |
"--os-compute-api-version 2.30".format(host) | |
) | |
def is_volume_backed(self, vm_id): | |
"""Checks if VM is booted from volume.""" | |
return ("booted from volume" in | |
self.vms_indexed[vm_id]['Image ID'].lower()) | |
def evacuate(self): | |
"""Evacuate all VMs from host.""" | |
commands = [] | |
failed = [] | |
for host in self.hosts_indexed.keys(): | |
self.calculate_initial_remaining_resources(host) | |
for vm_id in self.dest_map.keys(): | |
LOG.debug("Starting analysis for VM {}".format(vm_id)) | |
# Select a host considering other running migrations and resources | |
host = self.select_host(vm_id) | |
LOG.debug("Selected possible host for VM {} : {}".format( | |
vm_id, host)) | |
if not host: | |
# We may not be able to find a proper destination | |
# for a given instance | |
failed.append(vm_id) | |
continue | |
cmd = self.build_cmd(host, vm_id) | |
LOG.debug( | |
"Choosing to migrate VM {} to host {}".format(vm_id, host)) | |
commands.append(cmd) | |
# Register the VM and its destination, | |
# and readjust remaining resources | |
self.in_progress[vm_id] = host | |
self.adjust_remaining_resources(vm_id, host) | |
LOG.info("Migrations suggested: {}".format( | |
json.dumps(self.in_progress, sort_keys=True, indent=2))) | |
if failed: | |
LOG.error("Failed to find a destination host for the " | |
"following VMs: \n{}".format('\n'.join(failed))) | |
LOG.info("Commands: \n{}".format('\n'.join(commands))) | |
@staticmethod | |
def check_dependencies(): | |
"""Check for dependencies required to run the rest of the script.""" | |
if 'OS_AUTH_URL' not in os.environ: | |
LOG.error("Please source your credentials first.") | |
exit(1) | |
def run(self): | |
"""Main logic""" | |
self.check_dependencies() | |
self.gather_info() | |
self.dest_map = self.calculate_destinations() | |
LOG.debug("Hosts data before migrations: {}".format( | |
json.dumps(self.hosts_indexed, sort_keys=True, indent=2))) | |
self.evacuate() | |
LOG.debug("Hosts data after migrations: {}".format( | |
json.dumps(self.hosts_indexed, sort_keys=True, indent=2))) | |
if __name__ == '__main__': | |
args = parse_args(sys.argv[1:]) | |
main = EvacuationAssistant(args) | |
main.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment