Skip to content

Instantly share code, notes, and snippets.

@patrickjahns
Created September 3, 2016 18:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save patrickjahns/cfa90a39883206e18fdaccfd9d2809f0 to your computer and use it in GitHub Desktop.
Save patrickjahns/cfa90a39883206e18fdaccfd9d2809f0 to your computer and use it in GitHub Desktop.
Simple script to switch profiles for vcpu assignment
import sys
import libvirt
import json
import os.path
import argparse
from xml.etree import ElementTree
VCPU_FILE = "vcpu.json"
class VCPUPin:
def __init__(self, url, vcpumaps, ignored_domains=None):
self.conn = None
self.libvirturl = url
self.ignored_domains = ignored_domains
self.vcpumaps = json.loads(json.dumps(vcpumaps))
self.oldpins = {}
self.load_vcpumapping()
self.connect()
self.disconnect()
def __del__(self):
self.disconnect()
def connect(self):
if self.conn is None:
self.conn = libvirt.open(self.libvirturl)
if self.conn == None:
raise Exception("Failed to open connection to libvirt")
def disconnect(self):
if self.conn is not None:
self.conn.close()
self.conn = None
def load_vcpumapping(self):
if os.path.exists(VCPU_FILE):
with open(VCPU_FILE, "r") as f:
self.oldpins = json.loads(f.read())
def save_vcpumapping(self):
with open(VCPU_FILE, "w") as f:
f.write(json.dumps(self.oldpins))
def get_all_domains(self):
self.connect()
domainNames = []
domainIDs = self.conn.listDomainsID()
if domainIDs == None:
raise Exception("Failed to get a list of domain IDs")
if len(domainIDs) != 0:
for domainID in domainIDs:
domain = self.conn.lookupByID(domainID)
domainNames.append(domain.name())
self.disconnect()
return domainNames
def restore_domain_vcpumap(self, domName):
self.set_domain_vcpumap(domName, None)
def set_domain_vcpumap(self, domName, profile):
if profile is not None and profile not in self.vcpumaps:
raise Exception("unknown profile " + str(profile))
if profile is not None and domName not in self.vcpumaps[profile] and "default" not in self.vcpumaps[profile]:
raise Exception("unknown domain mapping "+"'"+ domName +"'"+ "and no default profile")
self.connect()
dom = self.conn.lookupByName(domName)
if dom == None:
raise Exception("Failed to find domain " + domName)
if dom.isActive():
vcpuinfo = dom.vcpus()
if profile is not None:
if domName in self.vcpumaps[profile]:
if "all" in self.vcpumaps[profile][domName]:
if len(self.vcpumaps[profile][domName]["all"]) != len(vcpuinfo[1][0]):
raise Exception("PROFILE " +"'"+ profile +"'"+ " DOMAIN" +"'"+ domName +"'"+" requires "+str(len(vcpuinfo[1][0]))+" VCPU entries")
else:
for x in range(len(vcpuinfo[0])):
if x not in self.vcpumaps[profile][domName]:
raise Exception("PROFILE: " +"'"+ profile +"'"+ " DOMAIN: " +"'"+ domName +" missing VCPUID "+str(x))
if len(self.vcpumaps[profile][domName][str(x)]) != len(vcpuinfo[1][0]):
raise Exception("PROFILE " +"'"+ profile +"'"+ " DOMAIN: " +"'"+ domName +" VCPUID "+str(x) +" requires exact "+str(len(vcpuinfo[1][0]))+" entries")
else:
if len(self.vcpumaps[profile]["default"]) != len(vcpuinfo[1][0]):
raise Exception("default mapping for profile"+"'"+ profile +"'"+" requires exact "+str(len(vcpuinfo[1][0]))+" entries")
for x in range(len(vcpuinfo[0])):
vcpuid = vcpuinfo[0][x][0]
if profile is None:
if domName in self.oldpins:
dom.pinVcpuFlags(vcpuid, tuple(self.oldpins[domName][str(vcpuid)]))
else:
if not domName in self.oldpins:
self.oldpins[domName] = {}
self.oldpins[domName][str(vcpuid)] = vcpuinfo[1][x]
if domName in self.vcpumaps[profile]:
if "all" in self.vcpumaps[profile][domName]:
dom.pinVcpuFlags(vcpuid, tuple(self.vcpumaps[profile][domName]["all"]))
else:
dom.pinVcpuFlags(vcpuid, tuple(self.vcpumaps[profile][domName][str(x)]))
else:
dom.pinVcpuFlags(vcpuid, tuple(self.vcpumaps[profile]["default"]))
self.save_vcpumapping()
self.disconnect()
def restore_all_vcpumap(self):
domains = self.get_all_domains()
for domain in domains:
if domain in self.ignored_domains:
continue
self.restore_domain_vcpumap(domain)
def set_all_vcpumap(self, profile):
domains = self.get_all_domains()
for domain in domains:
if domain in self.ignored_domains:
continue
self.set_domain_vcpumap(domain, profile)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--vcpumap", required=True)
parser.add_argument("--libvirt", default="qemu:///system")
parser.add_argument("--profile")
parser.add_argument("--domain")
parser.add_argument("--restore", action="store_true", default=False)
parser.add_argument("--ignored_domains")
args = parser.parse_args()
if not os.path.exists(args.vcpumap):
print("could not find vcpumap file")
exit(1)
if args.profile is None and args.restore is False:
print("please provide a profile or use --restore")
exit(1)
with open(args.vcpumap, "r") as f:
vcpumap = json.load(f)
vcpupin = VCPUPin(args.libvirt, vcpumap, args.ignored_domains)
if args.domain is not None:
if not args.restore:
vcpupin.set_domain_vcpumap(args.domain, args.profile)
else:
vcpupin.restore_domain_vcpumap(args.domain)
else:
if not args.restore:
vcpupin.set_all_vcpumap(args.profile)
else:
vcpupin.restore_all_vcpumap()
{"default": {
"default": [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0],
"defined1": {"all": [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]},
"defined2": {
"0": [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0],
"1": [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]
}
}
}
@AxisAssasin
Copy link

Can you explain how to use this script please?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment