Skip to content

Instantly share code, notes, and snippets.

@ssato
Created May 31, 2009 07:15
Show Gist options
  • Save ssato/120803 to your computer and use it in GitHub Desktop.
Save ssato/120803 to your computer and use it in GitHub Desktop.
#! /usr/bin/python
#
# * install libvirt network and domain (guest) from xml file
# * uninstall libvirt network and domain (guest) by name
#
# Copyright (C) 2009 Satoru SATOH <satoru.satoh at gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import libxml2
import libvirt
import logging
import optparse
import os
import re
import shutil
import subprocess
import sys
import tempfile
VMM_URI = 'qemu:///system'
VMM_IMAGES_DIR = '/var/lib/libvirt/images'
(_NOT_DEFINED, _DEFINED, _ACTIVE) = (0, 1, 2)
(CMD_INSTALL, CMD_UNINSTALL) = (0, 1)
(_TYPE_UNKNOWN, _TYPE_DOMAIN, _TYPE_NETWORK) = (-1, 0, 1)
class ParseError(Exception): pass
class ActiveTargetError(Exception): pass
class DefinedTargetError(Exception): pass
def __xml_type(xml):
"""@throw IOError, libxml2.parserError, etc.
"""
return libxml2.parseFile(xml).children.name # root element's name; domain, network
def __name_by_xml(xml):
"""Get domain or network name from given xml file.
@throw IOError, libxml2.parserError, etc.
"""
xpath = '/%s/name' % __xml_type(xml)
try:
return libxml2.parseFile(xml).xpathEval(xpath)[0].content
except IndexError:
raise ParseError("Xpath expression '%s' does not match" % xpath)
def __network_name_by_xml(network_xml):
"""Extract network name from given network xml file.
@see http://libvirt.org/formatnetwork.html
"""
try:
name = libxml2.parseFile(network_xml).xpathEval('/network/name')[0].content
except IOError:
logging.error("Could not open '%s'" % network_xml)
sys.exit(1)
except IndexError:
logging.error("Parse failed: '%s'" % network_xml)
sys.exit(1)
logging.debug("Network Name = '%s'" % name)
return name
def __domain_name_by_xml(domain_xml):
"""Extract domain (guest) name from given domain xml file.
@see http://libvirt.org/formatdomain.html
"""
name = ''
try:
name = libxml2.parseFile(domain_xml).xpathEval('/domain/name')[0].content
except IOError:
logging.error("Could not open '%s'" % network_xml)
sys.exit(1)
except IndexError:
logging.error("Parse failed: '%s'" % network_xml)
sys.exit(1)
logging.debug("Domain Name = '%s'" % name)
return name
def __network_status(conn, network_name):
"""Query the status of the network.
"""
if network_name in conn.listNetworks():
logging.debug("The network '%s' is active." % network_name)
ret = _ACTIVE
elif network_name in conn.listDefinedNetworks():
logging.debug("The network '%s' is defined but inactive." % network_name)
ret = _DEFINED
else:
logging.debug("The network '%s' is not defined." % network_name)
ret = _NOT_DEFINED
return ret
def __domain_status(conn, name):
"""Query the status of the domain.
"""
try:
if conn.lookupByName(name).name() == name:
logging.debug("The domain '%s' is active." % name)
return _ACTIVE
except libvirt.libvirtError: # It indicates the domain is not found.
pass
if name in conn.listDefinedDomains():
logging.debug("The domain '%s' is defined but inactive." % name)
ret = _DEFINED
else:
logging.debug("The domain '%s' is not defined." % name)
ret = _NOT_DEFINED
return ret
def __copy_image(path, newpath):
assert os.path.realpath(path) != os.path.realpath(newpath), \
"src = '%s', dst = '%s'" % (path, newpath)
logging.info("Copying %s to %s" % (path, newpath))
shutil.copy2(path, newpath)
def __base_image_path(image):
"""Get the path of base (backing) file for given image (path).
NOTE: $image should be QCow2 file, I guess.
"""
pipe = subprocess.Popen(['qemu-img', 'info', image], stdout=subprocess.PIPE)
(output, errors) = pipe.communicate()
# ex. backing file: test-base.qcow2 (actual path: /.../test-base.qcow2)
try:
return re.match(r'^backing file: \S+ \(actual path: ([^\)]+)\)',
output.splitlines()[-1]).groups()[0]
except:
return ''
def __uninstall(uri, name, force, backup, type=_TYPE_NETWORK):
"""Uninstall the given target; domain or network.
"""
conn = libvirt.open(uri)
if type == _TYPE_NETWORK:
stat_check_f = __network_status
find_by_name_f = conn.networkLookupByName
else:
stat_check_f = __domain_status
find_by_name_f = conn.lookupByName
stat = stat_check_f(conn, name)
if stat == _NOT_DEFINED:
logging.warn("Target '%s' is not defined. Nothing to do..." % name)
return False
target = find_by_name_f(name)
if backup is not None:
out = open(backup, 'w')
print >> out, target.XMLDesc(0)
if stat == _ACTIVE:
if force:
logging.debug("Try destroying and uninstalling '%s'..." % name)
target.destroy() # FIXME: It should take some time to finish.
target.undefine()
logging.debug("... Done")
else:
raise ActiveTargetError("Target '%s' is active." % name)
elif stat == _DEFINED:
logging.debug("Try uninstalling '%s'..." % name)
target.undefine()
logging.debug("... Done")
stat = stat_check_f(conn, name)
conn.close()
return stat == _NOT_DEFINED
def __install(uri, xml, force, autostart, type=_TYPE_NETWORK):
"""Install the target (domain or network) defined in given XML file.
"""
name = __name_by_xml(xml)
conn = libvirt.open(uri)
if type == _TYPE_NETWORK:
stat_check_f = __network_status
uninstall_f = uninstall_network
define_f = conn.networkDefineXML
# It will be called after connection object is deleted once and
# recreated so that new connection object must be passed.
find_by_name_f = lambda conn, name: conn.networkLookupByName(name)
else:
stat_check_f = __domain_status
uninstall_f = uninstall_domain
define_f = conn.defineXML
# Likewise
find_by_name_f = lambda conn, name: conn.lookupByName(name)
if force:
uninstall_f(uri, name, force)
content = open(xml).read()
define_f(content)
# FIXME: Reopen connection. It seems that to re-connect is needed to apply
# the changes above.
conn.close()
conn = libvirt.open(uri)
target = find_by_name_f(conn, name)
if autostart:
logging.debug("Making the target '%s' to be autostarted later." % name)
target.setAutostart(True)
logging.debug("Starts the target '%s' just created." % name)
target.create()
stat = stat_check_f(conn, name)
conn.close()
return stat == _ACTIVE
def __install_2(uri, xml, force, autostart):
"""It does basically samething as above although it will detect the target
type contrary to the avove.
"""
type = __xml_type(xml)
name = __name_by_xml(xml)
conn = libvirt.open(uri)
if type == 'network':
stat_check_f = __network_status
uninstall_f = uninstall_network
define_f = conn.networkDefineXML
# It will be called after connection object is deleted once and
# recreated so that new connection object must be passed.
find_by_name_f = lambda conn, name: conn.networkLookupByName(name)
elif type == 'domain':
stat_check_f = __domain_status
uninstall_f = uninstall_domain
define_f = conn.defineXML
# Likewise
find_by_name_f = lambda conn, name: conn.lookupByName(name)
else:
raise RuntimeError("XML type '%s' is unknown. Aborting..." % type)
if force:
uninstall_f(uri, name, force)
content = open(xml).read()
define_f(content)
# FIXME: Reopen connection. It seems that to re-connect is needed to apply
# the changes above.
conn.close()
conn = libvirt.open(uri)
target = find_by_name_f(conn, name)
if autostart:
logging.debug("Making the target '%s' to be autostarted later." % name)
target.setAutostart(True)
logging.debug("Starts the target '%s' just created." % name)
target.create()
stat = stat_check_f(conn, name)
conn.close()
return stat == _ACTIVE
# actions:
def confirm(action):
ans = raw_input('Do you really want to %s [yes/No] >' % action)
if ans.lower() == 'yes':
return
logging.info("Aborting...")
sys.exit(0)
def do_network_check(uri, name, **kwargs):
"""Check the status of the given network.
"""
conn = libvirt.openReadOnly(uri)
stat = __network_status(conn, name)
conn.close()
return stat
def install_images(xml, search_path=None):
"""Search images defined in $xml from $search_path and install these into
the appropriate locations.
$search_path will be set to dirname($xml) if $search_path is not given.
"""
image_paths = [x.content for x in \
libxml2.parseFile(xml).xpathEval(
'/domain/devices/disk[@type="file" and @device="disk"]/source/@file'
)
if x and getattr(x, 'content', False)
]
# Search images in the dir
# a. where $xml exists if search_path is None
# b. search_path if search_path is NOT None
if search_path is None:
search_path = os.path.dirname(xml)
copy_pairs = [
(os.path.join(search_path, os.path.basename(p)), p) \
for p in image_paths
]
# Search base images also (only for QCow2?)
# FIXME: Current implementation expects image and corresponding base image
# are in same dir.
for (src, dst) in copy_pairs:
base_image_src = __base_image_path(src)
if base_image_src:
base_image_dst = os.path.join(
os.path.dirname(dst), os.path.basename(base_image_src)
)
copy_pairs.append((base_image_src, base_image_dst))
#pprint.pprint(copy_pairs)
#sys.exit(0)
for src, dst in copy_pairs:
__copy_image(src, dst)
def relocate_images(xml, newxml, destdir=VMM_IMAGES_DIR):
doc = libxml2.parseFile(xml)
images = doc.xpathEval('/domain/devices/disk[@type="file" and @device="disk"]/source')
fileattr = 'file'
for image_elem in images:
filepath = image_elem.prop(fileattr)
filename = os.path(filename)
newpath = os.path.join(destdir, filename)
logging.debug("Try copying image...: %s -> %s" % (filepath, newpath))
__copy_image(filepath, newpath)
logging.debug("Rewrite image's path in the domain configuration")
image_elem.setProp(fileattr, newpath)
logging.info("Wrote new domain configuration: %s" % newxml)
out = open(newxml, 'w')
out.write(doc.serialize())
out.close()
del doc
def uninstall_domain(uri, name, force, backup=None):
"""Uninstall the given domain (guest).
"""
return __uninstall(uri, name, force, backup, _TYPE_DOMAIN)
def uninstall_network(uri, name, force, backup=None):
"""Uninstall the given network.
"""
return __uninstall(uri, name, force, backup, _TYPE_NETWORK)
def install_domain(uri, xml, force=False, autostart=True, search_path=None):
"""Install the domain defined in given XML file.
* Domain images are copied into path
* Image path in definitions
"""
install_images(xml, search_path)
return __install(uri, xml, force, autostart, _TYPE_DOMAIN)
def install_network(uri, xml, force=False, autostart=True):
"""Install the network defined in given XML file.
"""
return __install(uri, xml, force, autostart, _TYPE_NETWORK)
def option_parser(parser=None):
if parser is None:
parser = optparse.OptionParser('%prog COMMAND [OPTION ...] [ARGS...]')
parser.add_option('--uri', dest='uri', action="store",
type="string", help='Connection URI', default=VMM_URI)
parser.add_option('-v', '--verbose', dest='verbose', action="store_true",
default=False, help='verbose mode')
parser.add_option('-q', '--quiet', dest='quiet', action="store_true",
default=False, help='quiet mode')
return parser
def usage(prog=sys.argv[0]):
print >> sys.stderr, "Usage: %s COMMAND [OPTION ...] ARGS..." % prog
def help_extras():
print >> sys.stderr, """
Command:
net-install, net-uninstall install / uninstall network.
dom-install, dom-uninstall install / uninstall domain (guest).
"""
def dom_uninstall_option_parser():
parser = optparse.OptionParser('%prog [OPTION ...] dom-uninstall DOMAIN_NAME')
option_parser(parser)
parser.add_option('-f', '--force', dest='force', action="store_true",
default=False, help='Uninstall the domain even if it\'s active.')
parser.add_option('--backup', dest='backup', action="store", default=None,
type="string", help='Backup domain configuration to the file.')
return parser
def dom_install_option_parser():
parser = optparse.OptionParser('%prog [OPTION ...] dom-install DOMAIN_XML')
option_parser(parser)
parser.add_option('-f', '--force', dest='force', action="store_true",
default=False, help='Install the domain even if it exists.')
parser.add_option('-a', '--autostart', dest='autostart', action="store_true",
default=False, help='Make the domain autostarted after installed')
parser.add_option('--search-path', dest='search_path', action="store",
default=None, type="string",
help='Specify the path (dir) to search the domain disk images. [Default: dir contains the domain configuration]')
return parser
def net_uninstall_option_parser():
parser = optparse.OptionParser('%prog [OPTION ...] net-uninstall NETWORK_NAME')
option_parser(parser)
parser.add_option('-f', '--force', dest='force', action="store_true",
default=False, help='Uninstall the network even if it\'s active.')
parser.add_option('--backup', dest='backup', action="store", default=None,
type="string", help='Backup network configuration to the file.')
return parser
def net_install_option_parser():
parser = optparse.OptionParser('%prog [OPTION ...] net-install NETWORK_XML')
option_parser(parser)
parser.add_option('-f', '--force', dest='force', action="store_true",
default=False, help='Re-install the network even if it exists.')
parser.add_option('-a', '--autostart', dest='autostart', action="store_true",
default=False, help='Make the network autostarted after installed')
return parser
def main():
loglevel = logging.INFO
if len(sys.argv) < 2:
usage()
sys.exit(1)
cmd = sys.argv[1]
if cmd.startswith('net-i'):
cmd = 'net-install'
elif cmd.startswith('net-u'):
cmd = 'net-uninstall'
elif cmd.startswith('dom-i'):
cmd = 'dom-install'
elif cmd.startswith('dom-u'):
cmd = 'dom-uninstall'
else:
usage()
help_extras()
sys.exit(1)
# FIXME: Hack.
parser_f = globals().get('%s_option_parser' % cmd.replace('-','_'))
parser = parser_f()
(options, args) = parser.parse_args(sys.argv[2:])
if options.verbose:
loglevel = logging.DEBUG
if options.quiet:
loglevel = logging.WARN
# logging.basicConfig() in python older than 2.4 cannot handle kwargs,
# then exception 'TypeError' will be thrown.
try:
logging.basicConfig(level=loglevel)
except TypeError:
# To keep backward compatibility. See above comment also.
logging.getLogger().setLevel(loglevel)
if cmd == 'net-install':
if len(args) < 1:
parser.print_help()
sys.exit(1)
xml = args[0]
ret = install_network(options.uri, xml, options.force, options.autostart)
elif cmd == 'net-uninstall':
if len(args) < 1:
parser.print_help()
sys.exit(1)
name = args[0]
if not options.force:
confirm('uninstall the network %s' % name)
ret = uninstall_network(options.uri, name, options.force, options.backup)
elif cmd == 'dom-install':
if len(args) < 1:
parser.print_help()
sys.exit(1)
xml = args[0]
ret = install_domain(options.uri, xml, options.force, \
options.autostart, options.search_path)
elif cmd == 'dom-uninstall':
if len(args) < 1:
parser.print_help()
sys.exit(1)
name = args[0]
if not options.force:
confirm('uninstall the domain %s' % name)
ret = uninstall_domain(options.uri, name, options.force, options.backup)
sys.exit(ret)
if __name__ == '__main__':
main()
# vim:sw=4:ts=4:et:ft=python:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment