Skip to content

Instantly share code, notes, and snippets.

@ajayhn
Last active January 30, 2018 21:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ajayhn/ed2dd82acf1c1c7af399 to your computer and use it in GitHub Desktop.
Save ajayhn/ed2dd82acf1c1c7af399 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# Example Usage:
# ./shell.py "ls /" quit
# ./shell.py "ls /project" quit
# ./shell.py "ls /project/default-domain:default-project" quit
# ./shell.py "ls /project/default-domain:default-project/virtual_networks" quit
# ./shell.py "cat /project/default-domain:default-project" quit
# ./shell.py "cat -p /project/default-domain:default-project" quit
# ./shell.py "cat /project/default-domain:admin/virtual_networks/default-domain:default-project:default-virtual-network" quit
test_cmds = [
"ls /",
"ls /project",
"ls /project/default-domain:default-project",
"ls /project/default-domain:default-project/virtual_networks",
"cat /project/default-domain:default-project",
"cat -p /project/default-domain:default-project",
"cat /project/default-domain:admin/virtual_networks/default-domain:default-project:default-virtual-network",
]
import sys
import logging
import os
import argparse
import ConfigParser
import cgitb
from pprint import pprint
from cmd2 import Cmd, options, make_option
import functools
from uuid import UUID
import json
import tempfile
import cStringIO
import inspect
from vnc_api import vnc_api
from vnc_api.gen.generatedssuper import GeneratedsSuper
logger = logging.getLogger(__name__)
class NoSuchDirectory(Exception):
pass
class IsADirectory(Exception):
pass
class NoSuchFile(Exception):
pass
class NoSuchCommand(Exception):
pass
def is_uuid_like(string):
try:
UUID(string)
return True
except ValueError:
return False
# end is_uuid_like
def read_config(cfg_parser, section, option, default=None):
try:
val = cfg_parser.get(section, option)
except (AttributeError,
ConfigParser.NoOptionError,
ConfigParser.NoSectionError):
val = default
return val
# end read_config
class RestFileSystems(object):
_path_prefix_to_driver = {}
cwd = None
def add_driver(self, prefix, driver):
self._path_prefix_to_driver[prefix] = driver
# end add_driver
def _get_fs(self, path_pfx):
# if path_pfx can match any registered fs pfx,
# return None, else match default fs(contrail)
logger.debug('Getting filesystem for %s', path_pfx)
if any(x for x in self._path_prefix_to_driver
if x.startswith(path_pfx)):
return self._path_prefix_to_driver.get(path_pfx)
return self._path_prefix_to_driver.get('/')
def _match_fs(self, path_pfx):
logger.debug('Matching %s in %s', path_pfx,
[pfx for pfx in self._path_prefix_to_driver])
return [pfx for pfx,drv in self._path_prefix_to_driver.items()
if pfx.startswith(path_pfx)]
def get_completions(self, partial_path):
# process last part in path
if self.cwd is None:
if partial_path[0] != '/':
logger.debug('No leading / and CWD None')
return []
logger.debug('No CWD')
fs = self._get_fs(partial_path)
if not fs:
fs_matches = self._match_fs(partial_path)
logger.debug('Matched file systems %s', fs_matches)
return fs_matches
dirname = os.path.dirname(partial_path)
partial_filename = os.path.basename(partial_path)
logger.debug('File system found: %s dirname %s basename %s',
fs, dirname, partial_filename)
files = fs.list_dir(dirname)
matching_files = [f for f in files or [] if f.startswith(partial_filename)]
logger.debug('Matching files: %s', matching_files)
return matching_files
# end get_completions
def ls(self, path, opts):
if self.cwd is None:
fs = self._get_fs(path)
if not fs:
logger.error('ls: No matching file-system for %s', path)
return []
files = fs.list_dir(path, opts)
logger.debug("ls command found files %s", files)
return files
# end ls
def cat(self, path, opts):
if self.cwd is None:
fs = self._get_fs(path)
if not fs:
logger.error('cat: No matching file-system for %s', path)
return []
content = fs.print_file(path, opts)
logger.debug("cat command found content %s", content)
return content
# end cat
def inspect(self, path, opts=None):
pass
# end inspect
# end class RestFileSystems
class ContrailDriver(object):
@classmethod
def get_config_api_connection(cls, api_conf_files):
config_parser = ConfigParser.SafeConfigParser()
if api_conf_files is None:
config_parser.read(
['/etc/contrail/contrail-api.conf',
'/etc/contrail/contrail-keystone-auth.conf'])
else:
config_parser.read(api_conf_files)
username = read_config(config_parser, 'KEYSTONE', 'admin_user')
password = read_config(config_parser, 'KEYSTONE', 'admin_password')
tenant_name = read_config(
config_parser, 'KEYSTONE', 'admin_tenant_name')
connection = vnc_api.VncApi(
username=username, password=password, tenant_name=tenant_name)
try:
cls.obj_types = [x.replace('-', '_')
for x in vnc_api.all_resource_types]
cls.obj_classes = dict((x.replace('-', '_'),
vnc_api.get_object_class(x))
for x in vnc_api.all_resource_types)
except AttributeError:
cls.obj_types = connnection._type_to_class.keys()
cls.obj_classes = connection._type_to_class
return connection
# end get_config_api_connection
@classmethod
def all_field_names(cls, obj_type):
obj_class = cls.obj_classes[obj_type]
return list(obj_class.prop_fields |
obj_class.children_fields |
obj_class.backref_fields)
# end all_field_names
#@classmethod
#def serialize_to_json(cls, obj, field_names=None):
# import pdb; pdb.set_trace()
# obj_class = cls.obj_classes[obj._type.replace('-', '_')]
# serialized = {}
# serialized.update(obj._serialize_to_json())
# for backref_name in obj_class.backref_fields:
# if hasattr(self, backref_name):
# obj._serialize_field_to_json(serialized, None, backref_name)
# for child_name in obj_class.children_fields:
# if hasattr(self, child_name):
# obj._serialize_field_to_json(serialized, None, child_name)
# return serialized
#end serialize_to_json
def __init__(self, args):
self.args = args
self.conn = self.get_config_api_connection(args.api_conf)
try:
self.obj_types = [x.replace('-', '_')
for x in vnc_api.all_resource_types]
self.obj_classes = dict((x.replace('-', '_'),
vnc_api.get_object_class(x))
for x in vnc_api.all_resource_types)
except AttributeError:
self.obj_types = self.conn._type_to_class.keys()
self.obj_classes = self.conn._type_to_class
def _root_lister():
return self.obj_types
self.listers = {'/': _root_lister}
for obj_type in self.obj_types:
lister = functools.partial(self.collection_list, obj_type)
functools.update_wrapper(lister, self.collection_list)
self.listers['/%s' %(obj_type)] = lister
logger.debug('Initializing, all object types: %s', self.obj_types)
# end __init__
def _object_path_to_parts(self, obj_path):
# obj_path is full path of object/attr. returns obj_key and attr_path
try:
obj_key = obj_path.split('/')[2]
except IndexError:
raise NoSuchFile('Not an object path: %s' %(obj_path))
if len(obj_path.split('/')) > 2:
attr_path = '/'.join(obj_path.split('/')[3:])
else:
attr_path = None
return obj_key, attr_path
# end _object_path_to_parts
def _object_read(self, obj_type, obj_key,
include_children=False, include_backrefs=False):
obj_class = self.obj_classes[obj_type]
fields = set([])
if include_children:
fields = fields | obj_class.children_fields
if include_backrefs:
fields = fields | obj_class.backref_fields
method = getattr(self.conn, '%s_read' %(obj_type))
if ':' in obj_key or not is_uuid_like(obj_key):
obj = method(fq_name=obj_key.split(':'), fields=list(fields))
logger.debug('Read object %s', obj)
else:
obj = method(id=obj_key, fields=list(fields))
logger.debug('Read object %s', obj)
return obj
# end _object_read
def list_dir(self, dirpath, opts=None):
# dir path is oneof /, /<type>, /<type>/<obj-fqn>
# or /<type>/<obj-fqn>/<attr-hierarchy>
norm_dirpath = os.path.normpath(dirpath)
logger.debug('Listing directory for %s', norm_dirpath)
if norm_dirpath in self.listers:
# direct match list types/collection
logger.debug('Found lister %s', self.listers[norm_dirpath])
return self.listers[norm_dirpath]()
for obj_type in self.obj_types:
if not norm_dirpath.startswith('/%s/' %(obj_type)):
continue
#include_children = opts.list_children if opts else True
#include_backrefs = opts.list_back_references if opts else True
include_children = True
include_backrefs = True
return self.object_list(obj_type, norm_dirpath,
include_children, include_backrefs)
raise NoSuchDirectory('No such directory %s' %(norm_dirpath))
# end list_dir
def collection_list(self, obj_type):
logger.debug('Listing objects %s', obj_type)
method = getattr(self.conn, '%ss_list' %(obj_type))
resources = method()
uuids_fqns = []
for res in resources['%ss' %(obj_type.replace('_', '-'))]:
uuids_fqns.append(res['uuid'])
uuids_fqns.append(':'.join(res['fq_name']))
return uuids_fqns
# end collection_list
def object_list(self, obj_type, obj_path,
include_children=False, include_backrefs=False):
# obj_path is always the full path of object/property
obj_key, attr_path = self._object_path_to_parts(obj_path)
logger.debug('Listing object %s %s', obj_type, obj_key)
obj = self._object_read(obj_type, obj_key,
include_children, include_backrefs)
if not attr_path:
# all top-level attrs of objects
ret_attrs = []
for attr in obj.__dict__.keys():
if attr == '_id_perms':
ret_attrs.append('id_perms')
continue
if attr.startswith('_'):
continue
ret_attrs.append(attr)
return ret_attrs
# if full object
attr_path_parts = attr_path.split('/')
# handle refs/backrefs
if (attr_path_parts[0] in obj.ref_fields or
attr_path_parts[0] in obj.backref_fields):
if len(attr_path_parts) == 1:
# list of refs/back_refs
refs = getattr(obj, attr_path_parts[0])
uuids = [ref['uuid'] for ref in refs]
fqns = [':'.join(ref['to']) for ref in refs]
return uuids + fqns
# listing a specific ref/backref
ref_type = attr_path_parts[0].replace(
'_back_refs', '').replace('_refs', '')
# generate abspath for ref and recurse
ref_path = '/%s/%s' %(ref_type, '/'.join(attr_path_parts[1:]))
return self.object_list(ref_type, ref_path,
include_children, include_backrefs)
# end ref handling
# handle children
if attr_path_parts[0] in obj.children_fields:
if len(attr_path_parts) == 1:
# list of children
children = getattr(obj, attr_path_parts[0])
uuids = [child['uuid'] for child in children]
fqns = [':'.join(child['to']) for child in children]
return uuids + fqns
# listing a child
child_type = attr_path_parts[0][:-1] # strip plural
# generate abspath for child and recurse
child_path = '/%s/%s' %(child_type, '/'.join(attr_path_parts[1:]))
return self.object_list(child_type, child_path,
include_children, include_backrefs)
# end children handling
try:
attr = getattr(obj, '.'.join(attr_path_parts))
except AttributeError:
return None
logger.debug('Reading attr_path %s value %s', attr_path, attr)
if isinstance(attr, GeneratedsSuper):
return attr.__dict__.keys()
# attr is a leaf
return []
# end object_list
def print_file(self, file_path, opts):
# file_path is /<type>/<fqn>[/<attr-hierarchy]
norm_file_path = os.path.normpath(file_path)
logger.debug('Printing file for %s', norm_file_path)
for obj_type in self.obj_types:
if not norm_file_path.startswith('/%s/' %(obj_type)):
continue
#include_children = opts.read_children if opts else True
#include_backrefs = opts.read_back_references if opts else True
include_children = True
include_backrefs = True
return self.object_read(obj_type, norm_file_path,
include_children, include_backrefs)
raise NoSuchFile('No such file: %s' %(norm_file_path))
# end print_file
def object_read(self, obj_type, obj_path,
include_children=True, include_backrefs=True):
# obj_path is always the full path of object/property
# returns json representation of object
obj_key, attr_path = self._object_path_to_parts(obj_path)
logger.debug('Reading object %s %s', obj_type, obj_key)
obj = self._object_read(obj_type, obj_key,
include_children, include_backrefs)
if not attr_path:
#obj_json = self.serialize_to_json(obj)
return json.dumps(obj.serialize_to_json(),
default=lambda o: dict((k,v) for k,v in o.__dict__.iteritems()),
sort_keys=True, indent=4)
attr_path_parts = attr_path.split('/')
# handle refs/backrefs
if (attr_path_parts[0] in obj.ref_fields or
attr_path_parts[0] in obj.backref_fields):
try:
refs = getattr(obj, attr_path_parts[0])
except AttributeError:
raise NoSuchFile('No such object reference %s' %(obj_path))
ref_type = attr_path_parts[0].replace(
'_back_refs', '').replace('_refs', '')
if len(attr_path_parts) == 1:
# printing all ref/backref
refs_content = []
# find abspath of all refs and recurse
for ref in refs:
ref_path = '/%s/%s' %(ref_type, ref['uuid'])
refs_content.append(self.object_read(
ref_type, ref_path,
include_children, include_backrefs))
return ','.join(refs_content)
# end all refs of a type
# attr hierarchy within ref
# generate abspath for ref and recurse
ref_path = '/%s/%s' %(ref_type, '/'.join(attr_path_parts[1:]))
return self.object_read(ref_type, ref_path,
include_children, include_backrefs)
# end handling refs/backrefs
# handle children
if attr_path_parts[0] in obj.children_fields:
try:
children = getattr(obj, attr_path_parts[0])
except AttributeError:
raise NoSuchFile('No such object child %s' %(obj_path))
child_type = attr_path_parts[0][:-1] # strip plural
if len(attr_path_parts) == 1:
# printing all children
children_content = []
# find abspath of all children and recurse
for child in children:
child_path = '/%s/%s' %(child_type, child['uuid'])
children_content.append(self.object_read(
child_type, child_path,
include_children, include_backrefs))
return ','.join(children_content)
# end all children of a type
# listing a specific child
# generate abspath for child and recurse
child_path = '/%s/%s' %(child_type, '/'.join(attr_path_parts[1:]))
return self.object_read(child_type, child_path,
include_children, include_backrefs)
# end handling children
try:
attr = getattr(obj, '.'.join(attr_path_parts))
except AttributeError:
return None
logger.debug('Reading attr_path %s value %s', attr_path, attr)
if isinstance(attr, GeneratedsSuper):
attr_json = json.dumps(attr,
default=lambda o: dict((k,v) for k,v in o.__dict__.iteritems()))
return attr_json
else:
return json.dumps(attr)
raise NoSuchFile('No such object file %s' %(obj_path))
# end object_read
# end class ContrailDriver
class NeutronDriver(object):
def __init__(self, args=None):
self.args = args
# end class NeutronDriver
class Diagnostics(object):
def __init__(self, args, filesystems):
self.args = args
self.fs = filesystems
self.diag_cmds = [m[0].replace('diag_','')
for m in inspect.getmembers(
self, predicate=inspect.ismethod)
if m[0].startswith('diag_')]
self.contrail_config_api = ContrailDriver.get_config_api_connection(
args.api_conf)
# end __init__
def all_field_names(self, obj_type):
return ContrailDriver.all_field_names(obj_type)
# end all_field_names
def get_completions(self, text, line, begidx, endidx):
# process last part in line
line_parts = line.split()
if len(line_parts) == 1:
# trying to find all possible diag commands
#diag_cmds = [m[0].replace('diag_','')
# for m in inspect.getmembers(
# self, predicate=inspect.ismethod)
# if m[0].startswith('diag_')]
return self.diag_cmds
partial_input = line.split()[-1]
if (len(line_parts) == 2) and line[-1] != ' ':
# trying to match specific diag command
diag_cmd_pfx = partial_input
return [c for c in self.diag_cmds if c.startswith(partial_input)]
# diag command known, complete custom or as a file
diag_cmd = line_parts[1]
if diag_cmd not in self.diag_cmds:
return []
custom_completion = 'completion_%s' %(diag_cmd)
if hasattr(self, custom_completion):
return getattr(self, custom_completion)(text, line, begidx, endidx)
# default to complete as a file
matches = self.fs.get_completions(partial_input)
partial_base = os.path.basename(partial_input)
return [text + x.replace(partial_base, '', 1) for x in matches]
# end get_completions
def diag_vm_has_no_ip(self, arg_list):
vm_uuid = arg_list[0].split('/')[-1]
summary_report = []
detailed_report = []
detailed_report.append('Inspected VM: %s' %(
self.fs.inspect('/virtual-machine/%s' %(vm_uuid))))
try:
vm_obj = self.contrail_config_api.virtual_machine_read(
id=vm_uuid,
fields=self.all_field_names('virtual_machine'))
except vnc_api.NoIdError as e:
raise DiagException(summary_report, detailed_report, str(e))
vmi_uuids = [vmi['uuid'] for vmi in vm_obj.get_virtual_machine_interface_back_refs()]
vmi_infos = []
for vmi_uuid in vmi_uuids:
detailed_report.append('Inspected VMI: %s' %(
self.fs.inspect('/virtual-machine-interface/%s' %(vmi_uuid))))
vmi_obj = self.contrail_config_api.virtual_machine_interface_read(id=vmi_uuid)
vmi_info = {'vmi_name': vmi_obj.name,
'has_ri_ref': False,
'instance-ips': [],
'floating-ips': []}
# TODO: add check to see if right ri is ref'd
if vmi_obj.get_routing_instance_refs():
vmi_info['has_ri_ref'] = True
for ri_ref in vmi_obj.get_routing_instance_refs():
detailed_report.append('Inspected RI: %s' %(
self.fs.inspect('/routing-instance/%s' %(ri_ref['uuid']))))
else:
summary_report.append(
'ERROR: VMI %s has no routing instance reference' %(vmi_uuid))
for iip_back_ref in vmi_obj.get_instance_ip_back_refs() or []:
iip_uuid = iip_back_ref['uuid']
detailed_report.append('Inspected IIP: %s' %(
self.fs.inspect('/instance-ip/%s' %(iip_uuid))))
iip_obj = self.contrail_config_api.instance_ip_read(
id=iip_uuid)
vmi_info['instance-ips'].append(iip_obj.instance_ip_address)
for fip_back_ref in vmi_obj.get_floating_ip_back_refs() or []:
fip_uuid = fip_back_ref['uuid']
detailed_report.append('Inspected FIP: %s' %(
self.fs.inspect('/floating-ip/%s' %(fip_uuid))))
fip_obj = self.contrail_config_api.floating_ip_read(
id=fip_uuid)
vmi_info['floating-ips'].append(fip_obj.floating_ip_address)
vmi_infos.append(vmi_info)
# end for all vmi_uuids
vmis_present = "VM has %s interfaces: %s" %(
len(vmi_infos), pprint(vmi_infos))
summary_report.append(vmis_present)
def data_model_assertions():
return "Data Model Checks: PASS"
def data_presence_assertions():
return "Data Presence Checks: PASS"
summary_report.append(data_model_assertions())
summary_report.append(data_presence_assertions())
return summary_report + detailed_report
# end diag_vm_has_no_ip
def execute(self, diag_cmd, arg_list):
try:
return pprint(getattr(self, 'diag_%s' %(diag_cmd))(arg_list))
except DiagException as e:
return 'Diagnostic Exception: %s' %(str(e))
# end execute
# end class Diagnostics
class DiagException(Exception):
def __init__(self, summary, detailed, error):
self.summary = summary
self.detailed = detailed
self.error = error
# end __init__
def __str__(self):
return 'Error: %s\nSummary-Report: %s\nDetailed-Report: %s\n' %(
self.error, self.summary, self.detailed)
# end __str__
# end class DiagException
class Shell(Cmd):
def initialize(self, args, filesystems, diagnostics):
self.filesystems = filesystems
self.diagnostics = diagnostics
# end initialize
def do_EOF(self, line):
return True
# end do_EOF
def default(self, line):
print line
# end default
def completedefault(self, text, line, begidx, endidx):
# process last path in line
logger.debug('text: %s', text)
logger.debug('line: %s', line)
logger.debug('last: %s', line.split()[-1])
logger.debug('begidx: %s', begidx)
logger.debug('endidx: %s', endidx)
partial_path = line.split()[-1]
partial_base = os.path.basename(partial_path)
matches = self.filesystems.get_completions(partial_path)
logger.debug('\nmatches: %s, text: %s, pb: %s, begidx: %s, endidx: %s',
matches, text, partial_base, begidx, endidx)
# for each match found strip partial_base and prepend entered text
return [text + x.replace(partial_base, '', 1) for x in matches]
#cmd = line.split()[0].strip().lower()
#if cmd in set(['ls', 'cat']):
# partial_path = line.split()[-1]
# partial_base = os.path.basename(partial_path)
# matches = self.filesystems.get_completions(partial_path)
# logger.debug('\nmatches: %s, text: %s, pb: %s, begidx: %s, endidx: %s',
# matches, text, partial_base, begidx, endidx)
# # for each match found strip partial_base and prepend entered text
# return [text + x.replace(partial_base, '', 1) for x in matches]
#elif cmd == 'diagnose':
# partial_input = line.split()[-1]
# matches = self.diagnostics.get_completions(line, partial_input)
# return matches
# end completedefault
def complete_diagnose(self, text, line, begidx, endidx):
matches = self.diagnostics.get_completions(text, line, begidx, endidx)
return matches
#@options([
# make_option('-b', '--list-back-references', action="store_true"),
# make_option('-c', '--list-children', action="store_true"),
# ])
def do_ls(self, line, opts=None):
entries = self.filesystems.ls(line, opts)
if entries is None:
return
for e in entries:
print e
# end do_ls
@options([
make_option('-b', '--read-back-references', action="store_true"),
make_option('-c', '--read-children', action="store_true"),
make_option('-p', '--output-python-dict', action="store_true"),
])
def do_cat(self, line, opts=None):
content = self.filesystems.cat(line, opts)
if not content:
return
if opts.output_python_dict:
try:
pprint(json.loads(content))
except Exception as e:
logger.error('Exception: %s for content: %s',
str(e), content)
else:
print content
# end do_cat
def do_inspect(self, line, opts=None):
pass
# end do_inspect
def do_diagnose(self, line, opts=None):
line_parts = line.split()
if not line_parts:
print 'Choose one of %s' %(self.diagnostics.diag_cmds)
return
diag_cmd = line_parts[0]
if diag_cmd not in self.diagnostics.diag_cmds:
print '%s: Invalid diagnostic command' %(diag_cmd)
return
if len(line_parts) > 1:
cmd_opts = line_parts[1:]
else:
cmd_opts = None
diag_resp = self.diagnostics.execute(
diag_cmd, cmd_opts)
print diag_resp
# end do_diagnose
# end class Shell
def self_test(shell):
with tempfile.NamedTemporaryFile() as test_f:
def test_cmds_to_file():
for cmd in test_cmds:
test_f.write('%s\n' %(cmd))
test_f.flush()
return test_f
# end test_cmds_to_file
test_cmds_to_file()
shell.do_load(test_f.name)
test_f.close()
return
# end self_test
def main(args_str=None):
cgitb.enable(format='text')
logger.setLevel('ERROR')
logformat = logging.Formatter("%(levelname)s: %(message)s")
stdout = logging.StreamHandler()
stdout.setLevel('ERROR')
stdout.setFormatter(logformat)
logger.addHandler(stdout)
parser = argparse.ArgumentParser()
help="Path to contrail-api conf files," \
" default /etc/contrail-api.conf /etc/contrail/contrail-keystone-auth.conf"
parser.add_argument(
"--api-conf", action='append', help=help, metavar="FILE")
parser.add_argument(
"--self-test", action='store_true', help="Test shell commands")
args, remaining_argv = parser.parse_known_args(
args_str.split() if args_str else [])
shell = Shell()
shell.debug = True
shell.prompt = 'contrail> '
fs = RestFileSystems()
c_drv = ContrailDriver(args)
fs.add_driver('/', c_drv)
fs.add_driver('/neutron', NeutronDriver(args))
diags = Diagnostics(args, fs)
shell.initialize(args, fs, diags)
if args.self_test:
return self_test(shell)
shell.cmdloop()
# end main
if __name__ == '__main__':
main(' '.join(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment