Skip to content

Instantly share code, notes, and snippets.

@jtyr
Last active March 16, 2022 20:53
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtyr/779fdb0b719ef15c0b67726182af843e to your computer and use it in GitHub Desktop.
Save jtyr/779fdb0b719ef15c0b67726182af843e to your computer and use it in GitHub Desktop.
Get information about all VMs from vCenter
#!/usr/bin/env python
import argparse
import atexit
import getpass
import logging
import os
import re
import socket
import sys
import traceback
import yaml
from pyVim import connect
from pyVmomi import vim
from pyVmomi import vmodl
# Groups to be created for individual vCenters based on values from the record
# in the get_vm_info function. Populeted from external config file.
GROUPS = {}
# Global logger variable
log = None
# Whether to debug grouping condition evaluation
EVAL_COND_DEBUG = (
True if os.getenv('VCENTER_DUMP_EVAL_COND_DEBUG', '').lower() in [
'1', 'true', 'yes', 'y'] else False)
# This helps to improve YAML formatting
class MyDumper(yaml.Dumper):
def increase_indent(self, flow=False, indentless=False):
return super(MyDumper, self).increase_indent(flow, False)
def debug(msg):
if EVAL_COND_DEBUG:
log.debug(msg)
def get_args():
parser = argparse.ArgumentParser(
description='Get information about all VMs from vSphere')
parser.add_argument(
'-s', '--host',
required=True,
help='vSphere hostname or IP')
parser.add_argument(
'-p', '--port',
type=int,
default=443,
help='vSphere port number (default 443)')
parser.add_argument(
'-S', '--disable_ssl_verification',
action='store_true',
help='Disable verification of the SSL certificate')
parser.add_argument(
'-U', '--user',
required=True,
help='Username used to connecting to vSphere')
parser.add_argument(
'-P', '--password',
help='Password used to connecting to vSphere')
parser.add_argument(
'-f', '--file',
required=True,
help='Filename of the existing YAML inventory')
parser.add_argument(
'-i', '--filter',
help='Fetch oply VMs defined by regexp')
parser.add_argument(
'-r', '--resolve',
action='store_true',
help='Do not trust the IP from vCenter, try to resolve the hostname')
parser.add_argument(
'-n', '--ipinfo',
action='store_true',
help='Adds ip_info for each record')
parser.add_argument(
'-e', '--esxihost',
action='store_true',
help='Adds ESXi host for each record')
parser.add_argument(
'-g', '--group',
help='Default group name for the vCenter')
parser.add_argument(
'-o', '--stdout',
action='store_true',
help='Print output to STDOUT instead of back into the file')
parser.add_argument(
'-d', '--debug',
action='store_true',
help='Debug mode')
parser.add_argument(
'-c', '--config',
help=(
'Path to the config file (default vcenter_dump.yaml in the '
'directory where the script lives)'))
args = parser.parse_args()
if not args.password:
args.password = getpass.getpass(
prompt='Enter password for host %s and user %s: ' % (
args.host, args.user))
return args
def read_config_file(args):
config_file = args.config
if config_file is None:
# Default config path is where the script lives
config_file = "%s/vcenter_dump.yaml" % os.path.dirname(
os.path.realpath(__file__))
if len(GROUPS) == 0:
with open(config_file, 'r') as stream:
try:
data = yaml.safe_load(stream)
except yaml.YAMLError as e:
log.error("Cannot parse YAML file: %s" % e)
sys.exit(1)
return data
def read_yaml_file(args):
log.debug("Reading YAML inventory")
data = []
if os.path.exists(args.file):
with open(args.file, 'r') as stream:
try:
data = yaml.safe_load(stream)
except yaml.YAMLError as e:
log.error("Cannot parse YAML file: %s" % e)
sys.exit(1)
if data is None:
data = []
return data
def get_ip(vm, resolve):
summary = vm.summary
# Take the guest IP as the default one
ip = summary.guest.ipAddress
ip_info = 'summary'
found = False
# Try to resolve the hostname
if resolve:
try:
ip = socket.gethostbyname(summary.config.name.strip())
found = True
except Exception:
pass
if found:
if summary.guest.ipAddress is None:
ip_info = "resolved %s" % ip
elif ip != summary.guest.ipAddress:
ip_info = "resolved %s != %s" % (ip, summary.guest.ipAddress)
else:
ip_info = "resolved the same"
# Find the most appropriate IP address from VMware
if (
not found and (
summary.guest.ipAddress is None or
# IPv6 address
':' in summary.guest.ipAddress)):
for nic in vm.guest.net:
if nic.ipConfig is not None:
for addr in nic.ipConfig.ipAddress:
if (
addr.state == 'preferred' and
# Ignore IPv6 addresses
':' not in addr.ipAddress and
# Ignore NET and GW IPs
not addr.ipAddress.endswith('.1') and
not addr.ipAddress.endswith('.0')):
ip = addr.ipAddress
found = True
break
if found:
if summary.guest.ipAddress is None:
ip_info = "found %s" % ip
elif ip != summary.guest.ipAddress:
ip_info = "found %s != %s" % (
ip, summary.guest.ipAddress)
else:
ip_info = "found the same"
break
if ip is not None:
ip = str(ip)
return ip, ip_info
def get_dc(vm):
dc = None
parent = vm.parent
# Walk throught the folder tree until we hit the Datacenter type or the end
if parent is not None:
while parent is not None and not isinstance(parent, vim.Datacenter):
parent = parent.parent
else:
dc = parent.name
return dc
def get_vm_info(cfm, vm, args):
try:
summary = vm.summary
except vmodl.fault.ManagedObjectNotFound as e:
log.warning("%s" % e)
return
# Set default values
uuid = None
guest = None
name = None
state = None
if summary.config.uuid is not None:
uuid = str(summary.config.uuid)
if summary.config.name is not None:
name = str(summary.config.name.strip())
if summary.config.guestId is not None:
guest = str(summary.config.guestId)
if summary.runtime.powerState is not None:
state = str(summary.runtime.powerState)
if summary.runtime.host.name is not None:
host = str(summary.runtime.host.name)
# Get IP
ip, ip_info = get_ip(vm, args.resolve)
# Get DC
dc = get_dc(vm)
# Get cluster
cluster = summary.runtime.host.parent.name
# Compose the machine record
record = {
'_cluster': cluster,
'_dc': dc,
'ip': ip,
'name': name,
'state': state,
'vcenter': {
'guest_id': guest,
'uuid': uuid,
},
}
if args.ipinfo:
record['vcenter']['ip_info'] = ip_info
if args.esxihost:
record['vcenter']['esxi_host'] = host
return record
def get_vms(args, pat):
log.debug("Fetching VMs from vCenter")
# This is where re store records about the VMs
data = []
try:
# Choose connection method using SSL or not
connMethod = connect.SmartConnect
if args.disable_ssl_verification:
connMethod = connect.SmartConnectNoSSL
# Establish connection
connection = connMethod(
host=args.host,
user=args.user,
pwd=args.password,
port=args.port)
# Register handler for automatic disconnection
atexit.register(connect.Disconnect, connection)
# Get view with desired information
content = connection.RetrieveContent()
folder = content.rootFolder
viewType = [vim.VirtualMachine]
recursive = True
container = content.viewManager.CreateContainerView(
folder, viewType, recursive)
# Create Traversal spec
traversal_spec = vmodl.query.PropertyCollector.TraversalSpec(
name="traversal_spec",
path='view',
skip=False,
type=vim.view.ContainerView
)
# Create Property Spec
property_spec = vmodl.query.PropertyCollector.PropertySpec(
type=vim.VirtualMachine,
all=False,
pathSet=['name']
)
# Create Object Spec
object_spec = vmodl.query.PropertyCollector.ObjectSpec(
obj=container,
skip=True,
selectSet=[traversal_spec]
)
# Create Filter Spec
filter_spec = vmodl.query.PropertyCollector.FilterSpec(
objectSet=[object_spec],
propSet=[property_spec],
reportMissingObjectsInResults=False
)
# Retrieve objects
objects = connection.content.propertyCollector.RetrieveContents(
[filter_spec])
vms = []
# Get only valid VM objects
for vm_obj in objects:
if (
len(vm_obj.propSet) == 1 and
not vm_obj.obj.config.template and (
args.filter is None or
pat.search(vm_obj.propSet[0].val) is not None)):
vms.append(vm_obj.obj)
# Get custom field manager
cfm = connection.content.customFieldsManager.field
# Count number of VMs
vms_len = len(vms)
log.debug("Processing VMs")
# Walk through all results sorted by VM UUID and name
for n, vm in enumerate(vms):
if args.debug:
sys.stderr.write(
"DEBUG: [%s] %d/%d%s\r" % (log.name, n, vms_len, ' '*10))
sys.stderr.flush()
if (
args.filter is None or
pat.search(vm.summary.config.name) is not None):
vm_info = get_vm_info(cfm, vm, args)
if vm_info is not None:
data.append(vm_info)
except Exception as e:
log.error("%s" % e)
sys.stderr.write("%s\n" % ('-'*60))
traceback.print_exc(file=sys.stderr)
sys.stderr.write("%s\n" % ('-'*60))
sys.stderr.flush()
sys.exit(1)
return data
def update_vm(i, d):
# Compare records
for k in d.keys():
# Exceptions for overrides
if (
(
# Don't change IP if ip_info is manual
k == 'ip' and
'vcenter' in i and
'ip_info' in i['vcenter'] and
i['vcenter']['ip_info'] == 'manual'
) or (
# Don't change IP for powered off VMs
k == 'ip' and
d['state'] == 'poweredOff' and
d[k] is None
)):
continue
# Add new or update non-matching fields
if (
k != 'vcenter' and (
k not in i or (
k in i and
i[k] != d[k]))):
i[k] = d[k]
elif k == 'vcenter':
for vk, vv in d[k].items():
# Don't change vcenter.ip_info if it's set to 'manual'
if (
vk == 'ip_info' and
vk in i and
i[vk] == 'manual'):
continue
if k not in i:
i[k] = {}
i[k][vk] = vv
def update_vms(inventory, data):
log.debug("Updating inventory")
# Update existing records
for d in data:
done = False
# Search if there is the same UUID
for i in inventory:
if (
'vcenter' in i and
'uuid' in i['vcenter'] and
d['vcenter']['uuid'] == i['vcenter']['uuid']):
update_vm(i, d)
i['_updated'] = 'uuid'
done = True
break
# Search if there is the same NAME
if not done:
for i in inventory:
if 'name' in i and d['name'] == i['name']:
update_vm(i, d)
i['_updated'] = 'name'
done = True
break
# Search if there is the same IP
if not done:
for i in inventory:
if 'ip' in i and d['ip'] == i['ip']:
update_vm(i, d)
i['_updated'] = 'ip'
done = True
break
# Add a missing record
if not done:
d['_updated'] = 'new'
inventory.append(d)
def clear_inventory(inventory, args):
log.debug("Cleaning up the inventory")
tmp = []
for i in inventory:
# Copy only records which were updated/added
if '_updated' in i or args.filter is not None:
record = {}
# Delete fields used only for processing
for k in i.keys():
if not k.startswith('_'):
record[k] = i[k]
tmp.append(record)
return tmp
# Taken from yaml_list_inventory
# (https://github.com/jtyr/ansible-yaml_list_inventory)
def get_host_key_value(host, key):
hk_exists = False
h_v = None
path = key.split('.')
# Test the path
for p in path:
# Test if the path is a ref to a list's item
m = re.match(r'(.*)\[(\d+)\]$', p)
idx = None
if m is not None and len(m.groups()) == 2:
p = m.group(1)
idx = int(m.group(2))
if p in host:
host = host[p]
if idx is not None:
if isinstance(host, list) and len(host) > abs(idx):
host = host[idx]
else:
break
else:
break
else:
# This gets applied only when loop succesfully finished
h_v = host
hk_exists = True
return hk_exists, h_v
# Taken from yaml_list_inventory
# (https://github.com/jtyr/ansible-yaml_list_inventory)
def eval_condition(host, conditions, default=True):
debug("Starting %s" % ('accept' if default else 'ignore'))
debug("Data: %s" % host)
if len(conditions) == 0:
ret = default
else:
ret = False
# Loop through all conditions
for c in conditions:
i = 0
c_len = len(c.items())
# Loop through all keys/values of each condition
for k, k_v in c.items():
i += 1
optional = False
neg = False
# Check if the key is optional
if k.startswith('@'):
k = k[1:]
optional = True
# Check if the key exists in the host
hk_exists, h_v = get_host_key_value(host, k)
# Mormalize the value of the key to be always list
if not isinstance(k_v, list):
k_v = [k_v]
if hk_exists:
# If the key exists, normalize the value
if isinstance(h_v, list):
h_vals = h_v
else:
h_vals = [h_v]
neg_ret = True
# Loop through all values of the key
for v in k_v:
# Check if the value is negation
if v is not None and v.startswith('!'):
neg = True
# Loop through all value items
for h_val in h_vals:
debug(
" Key '%s' exists - comparing condition %s=%s "
"with value %s" % (k, k, v, h_val))
# Compare the host value with the condition value
if v is None:
if h_val is None:
debug(" Matched None value")
ret = True
else:
debug(" Nothing matches None")
ret = False
neg_ret = False
elif h_val is not None:
if (
v.startswith('!~') and
re.match(v[2:], h_val) is not None):
debug(" Matched negative regexp value")
ret = False
neg_ret = False
elif (
v.startswith('~') and
re.match(v[1:], h_val) is not None):
debug(" Matched regexp value")
ret = True
elif (
v.startswith('!') and
h_val == v[1:]):
debug(
" Matched negative value")
ret = False
neg_ret = False
elif h_val == v:
debug(" Matched value")
ret = True
else:
debug(" Nothing matches")
ret = False
neg_ret = True
else:
debug(
" Nothing matches (should not happen)")
ret = False
neg_ret = False
if not neg_ret:
debug(
" <- Breaking value loop because net_reg "
"is False")
ret = neg_ret
break
elif not neg and ret:
debug(
" <- Breaking value loop because cond is "
"True")
break
if neg:
debug(" <- Taking net_reg value")
ret = neg_ret
elif optional:
debug(" Key '%s' is optional" % k)
if i < c_len:
ret = True
else:
debug(" Key '%s' does not exist" % k)
ret = False
if not ret:
debug(
" <- Breaking key loop because one of the values turn "
"ret=False")
break
if ret:
debug(" <- Breaking cond loop because ret=True")
break
debug(
"Finishing %s with ret=%s" % (
('accept' if default else 'ignore'),
ret))
return ret
def enrich_groups(inventory, args, pat):
log.debug("Enriching ansible.group")
if args.host not in GROUPS:
return
for i in inventory:
if args.filter != i['name'] and pat.search(i['name']) is None:
continue
groups = []
grps = []
# Get the current list of groups into temporal variable
if 'ansible' in i and 'group' in i['ansible']:
if isinstance(i['ansible']['group'], list):
# Make the list unique
grps = list(set(i['ansible']['group']))
else:
# Baseline to list
grps = [i['ansible']['group']]
for group, cond in GROUPS[args.host].items():
# Check if the group should be added
should_be_added = eval_condition(i, cond)
if not should_be_added and group in grps:
# Remove the group if defined and it should not be
grps.remove(group)
elif should_be_added and group != args.group:
debug('Group %s should be added' % group)
# Check if group already exists
for g in grps:
if g.startswith(group):
debug('Group %s alredy covers the group' % g)
break
else:
debug('Adding group %s' % group)
# Add group
grps.append(group)
debug('---')
# Clean up parent groups
for g in grps:
if (
args.group is not None and
g != args.group and
not args.group.startswith(g)):
debug("Appending group %s" % g)
groups.append(g)
if len(groups) == 0:
# Remove the 'ansible' section if empty
if (
'ansible' in i and
isinstance(i['ansible'], dict) and
len(i['ansible']) == 0):
del(i['ansible'])
else:
# Order and (re)construct the groups
if 'ansible' not in i:
i['ansible'] = {}
if len(groups) == 1:
i['ansible']['group'] = groups[0]
else:
i['ansible']['group'] = sorted(groups)
def write_output(inventory, args):
if args.stdout:
log.debug("Printing to STDOUT")
output = sys.stdout
else:
log.debug("Printing to file")
try:
output = open(args.file, 'w')
except IOError as e:
log.error("Cannot open file '%s' for write.\n%s" % (args.file, e))
output.write("---\n\n")
output.write(
yaml.dump(inventory, Dumper=MyDumper, default_flow_style=False))
if not args.stdout:
try:
output.close()
except IOError as e:
log.error("Cannot close file '%s'.\n%s" % (args.file, e))
def main():
# Parse command line arguments
args = get_args()
# Configure logger
global log
log = logging.getLogger(os.path.splitext(os.path.basename(args.file))[0])
level = logging.WARNING
if args.debug:
level = logging.DEBUG
logging.basicConfig(
format='%(levelname)s: [%(name)s] %(message)s',
level=level)
# Normalize the filter option
if args.filter == '':
args.filter = None
# Read the config file
global GROUPS
GROUPS = read_config_file(args)
# Regexp pattern
pat = re.compile(args.filter if args.filter else '', re.IGNORECASE)
# Read the YAML inventory file
inventory = read_yaml_file(args)
# Get list of VMs from the vCenter
data = get_vms(args, pat)
# Update the inventory with new data
update_vms(inventory, data)
# Enrich groups based on grouping conditions
enrich_groups(inventory, args, pat)
# Clean up the inventory from all records without UUID or with UUID which
# is not in the data
inventory = clear_inventory(inventory, args)
# Print out the final YAML inventory
write_output(inventory, args)
if __name__ == '__main__':
main()
---
# vSphere hostname or IP
vcenter.example.com:
group1:
- # Only if the '_dc' parameter is 'DC1'
_dc: DC1
# Only if the Guest ID doesn't start with 'win' (using regexp)
vcenter.guest_id: "!~win"
group1-win:
- # Only if the '_dc' parameter is 'DC1'
_dc: DC1
# Only if the Guest ID starts with 'win' (using regexp)
vcenter.guest_id: ~win
# Exclude the vCenter VM
name: "!vcenter"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment