Skip to content

Instantly share code, notes, and snippets.

@Millnert
Last active February 19, 2023 08:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Millnert/ad1eadd53824a5e405acd451ef5338cd to your computer and use it in GitHub Desktop.
Save Millnert/ad1eadd53824a5e405acd451ef5338cd to your computer and use it in GitHub Desktop.
simple glance qcow2->raw conversion
#!/usr/bin/python
""" Glance client to convert QCOW2 images in Glance to RAW """
# from pprint import pprint
from os import environ as env
import collections
import subprocess
# import sys
from glanceclient.v2 import client as glclient
from glanceclient.v2 import shell as glshell
from glanceclient.common import utils
from keystoneauth1.identity import v3 as keystone_v3
from keystoneauth1 import session as keystone_session
from keystoneclient.v3 import client as ksclient
def get_keystone_session():
""" Get a keystone instance """
auth_url = None
cacert = None
username = None
username = None
password = None
project_id = None
project_name = None
# region_name = None
auth_url = env.get('OS_AUTH_URL')
cacert = env.get('OS_CACERT')
username = env.get('OS_USERNAME')
password = env.get('OS_PASSWORD')
project_id = env.get('OS_TENANT_ID', 'default')
project_name = env.get('OS_TENANT_NAME', 'default')
project_domain_id = env.get('OS_PROJECT_DOMAIN_ID', 'default')
user_domain_id = env.get('OS_USER_DOMAIN_ID', 'default')
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.auth.identity.v3.html#module-keystoneclient.auth.identity.v3.password
# user_domain_id = project_id
# Using parameters {'username': 'admin',
# 'project_id': '4b1903a745484428a816c905ec00a970',
# 'user_domain_id': 'default',
# 'auth_url': 'https://keystone-beta.cloud.ipnett.se/v3',
# 'password': '***', 'project_domain_id': 'default'}
auth = keystone_v3.Password(auth_url=auth_url, username=username,
password=password, project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
user_domain_id=user_domain_id)
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.html#module-keystoneclient.session
# above deprecated in favour of:
# http://docs.openstack.org/developer/keystoneauth/using-sessions.html#sessions-for-users
session = keystone_session.Session(auth=auth, verify=cacert)
# return keystone
return session
def get_endpoints_generic(session, region, servicename, interface):
""" Gets a generic endpoint filtered by region, servicename and interface
None means do not filter. conditions are AND:ed """
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.v3.html#module-keystoneclient.v3.client
keystone = ksclient.Client(session=session)
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.v3.html#keystoneclient.v3.endpoints.EndpointManager
resulting_endpoints = []
region_endpoints = set()
servicename_endpoints = set()
interface_endpoints = set()
endpoints = keystone.endpoints.list()
services = keystone.services.list()
services_dict = {}
for service in services:
services_dict[service.id] = service
for endpoint in endpoints:
service = services_dict.get(endpoint.service_id)
if service is not None:
if servicename is not None and service.name == servicename:
servicename_endpoints.add(endpoint)
elif servicename is None:
servicename_endpoints.add(endpoint)
if region is not None and endpoint.region == region:
region_endpoints.add(endpoint)
elif region is None:
region_endpoints.add(endpoint)
if interface is not None and endpoint.interface == interface:
interface_endpoints.add(endpoint)
elif interface is None:
interface_endpoints.add(endpoint)
resulting_endpoints = list(servicename_endpoints.intersection(region_endpoints).
intersection(interface_endpoints))
return resulting_endpoints
def get_glance_endpoints(keystone, region, interface='public'):
""" Get Glance endpoints from keystone client """
glance_endpoints = get_endpoints_generic(keystone, region, 'glance', interface)
return glance_endpoints
def get_glanceclient():
""" returns a glance client instance """
region = env.get('OS_REGION_NAME')
session = get_keystone_session()
glance_endpoints = get_glance_endpoints(session, region)
if len(glance_endpoints) > 0:
glance = glclient.Client(glance_endpoints[0].url, session=session)
return glance
# A glance image object looks like this
# {u'architecture': u'x86_64',
# u'checksum': u'38d62e2e1909c89f72ba4d5f5c0005d5',
# u'container_format': u'bare',
# u'created_at': u'2016-04-06T19:54:20Z',
# u'direct_url': u'rbd://8ea29b52-232a-4b77-ad71-21e5de70960f/glance-images/
# 66a32ab4-aca9-4a58-b512-249a4ecef9e8/snap',
# u'disk_format': u'qcow2',
# u'file': u'/v2/images/66a32ab4-aca9-4a58-b512-249a4ecef9e8/file',
# u'id': u'66a32ab4-aca9-4a58-b512-249a4ecef9e8',
# u'min_disk': 0,
# u'min_ram': 0,
# u'name': u'fedora-23-cloud-qcow2',
# u'owner': u'4b1903a745484428a816c905ec00a970',
# u'protected': True,
# u'schema': u'/v2/schemas/image',
# u'size': 234363392,
# u'status': u'active',
# u'tags': [],
# u'updated_at': u'2016-04-07T09:10:03Z',
# u'virtual_size': None,
# u'visibility': u'public'},
def get_images(glance):
""" List glance images """
images = [img for img in glance.images.list()]
return images
def filter_images_by_diskformat(images, diskformat):
""" Filters images in given list by diskformat """
filtered_images = [img for img in images if img['disk_format'] == diskformat]
return filtered_images
def download_image(glance, image, filename, progress=False, verbose=False):
""" Download an image """
if verbose:
print "Downloading to %s" % filename
argscol = collections.namedtuple('gcargs', 'id file progress')
args = argscol(id=image['id'], file=filename, progress=progress)
glshell.do_image_download(glance, args)
def convert_image(file_path_name):
""" Converts image, assuming qcow2 file, to raw file at same location """
# Assuming file_path is the path to a local qcow2 file
if file_path_name.endswith('.qcow2'):
raw_file_path_name = file_path_name[:-6] + '.raw'
ret = subprocess.call(['qemu-img', 'convert', file_path_name,
raw_file_path_name])
if ret != 0:
raise Exception("Conversion did not work. " \
"Try it manually: qemu-img convert %s %s" %
(file_path_name, raw_file_path_name))
return raw_file_path_name
else:
raise Exception("File given does not have qcow2 ending")
def create_clone_image(glance, source_image):
""" Create a clone of a new image and return the new id """
#name = source_image.get("name") + "-test"
parameters = [
"architecture",
"container_format",
"name",
"min_disk",
"min_ram",
"protected",
"tags",
"visibility"
]
clone_dict = {}
for parameter in parameters:
if parameter in source_image:
clone_dict[parameter] = source_image[parameter]
clone_dict["disk_format"] = "raw"
image = glance.images.create(**clone_dict)
return image
def upload_image(glance, image, filename, progress=False, verbose=False):
""" Upload an image """
if verbose:
print "Uploading %s to %s (%s)" % (filename, image['name'], image['id'])
argscol = collections.namedtuple('gcargs', 'id file size progress')
args = argscol(id=image['id'], file=filename, size=None, progress=progress)
glshell.do_image_upload(glance, args)
def process_image(glance, image, workingpath=".", progress=False, verbose=True):
""" Downloads, converts, clone-creates, uploads one image """
# if "converted-to-raw" in image["tags"]:
# return
if verbose:
print "Processing %s (%s) [%s]" % (image['name'], image['id'],
utils.make_size_human_readable(image['size']))
temp_filename = "%s/%s.qcow2" % (workingpath, image.get("id"))
download_image(glance, image, temp_filename, progress=progress,
verbose=verbose)
if verbose:
print "Converting image"
converted_filename = convert_image(temp_filename)
clone_image = create_clone_image(glance, image)
upload_image(glance, clone_image, converted_filename, progress=progress,
verbose=verbose)
# image['tags'] = image['tags'].append("converted-to-raw")
# doesn't work. probably looking for other property feature, but
# "property" doesn't seem to work either
def main():
""" Main routine """
workingdir = "."
glance = get_glanceclient()
images = get_images(glance)
qcow2_images = filter_images_by_diskformat(images, 'qcow2')
# pprint(qcow2_images)
for image in qcow2_images:
process_image(glance, image, workingpath=workingdir, progress=True,
verbose=True)
if __name__ == '__main__':
main()
@shaleh
Copy link

shaleh commented Apr 8, 2016

This is shaleh from irc.

Suggestions:

var = dict.get(key)  # instead of set to None and check if key in dict then extract value
print "Message begin", var  # instead of all of the %s formatting

Also, when in doubt remember the debugging one liner

import pdb; pdb.set_trace()

@Millnert
Copy link
Author

Millnert commented Apr 9, 2016

Thanks shaleh! I have updated the gist now with what i did during the remainder of the night. I cheated out a little and reused the upload and download function as implemented in glance's shell client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment