Skip to content

Instantly share code, notes, and snippets.

@joshzcold
Created November 3, 2022 20:37
Show Gist options
  • Save joshzcold/d99993400ae0cce77041a74f5a06e081 to your computer and use it in GitHub Desktop.
Save joshzcold/d99993400ae0cce77041a74f5a06e081 to your computer and use it in GitHub Desktop.
testinfra parse ansible inventory before running tests using ansible api.
#!/usr/bin/env python3
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import shutil
import subprocess
import ansible.constants as C
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase
from ansible.vars.manager import VariableManager
from ansible import context
import argparse
import sys
import re
import os
parser = argparse.ArgumentParser(description='Load ansible data and execute tests')
parser.add_argument('-i', '--inventory', action="append", required=True, type=str, help='inventory directories')
parser.add_argument('-t', '--tests', action="append", required=False, type=str, help='tests or directories')
parser.add_argument('-r', '--report_dir', help='where to put pytest reports, do not report if no value is found')
parser.add_argument('-v', '--verbose', action="store_true", help="enable verbose logging for this script and py.test")
parser.add_argument('-n', '--parallel', type=int, help="tell py.test to run x number of parallel processes")
parser.add_argument('-o', '--junit_xml', help="output results to a junit xml report at path")
args = parser.parse_args()
def remove_suffix(input_string, suffix):
if suffix and input_string.endswith(suffix):
return input_string[:-len(suffix)]
return input_string
# Create a callback plugin so we can capture the output
class ResultsCollectorJSONCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in.
If you want to collect all results into a single object for processing at
the end of the execution, look into utilizing the ``json`` callback plugin
or writing your own custom callback plugin.
"""
def __init__(self, *args, **kwargs):
super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}
def v2_runner_on_unreachable(self, result):
host = result._host
self.host_unreachable[host.get_name()] = result
def v2_runner_on_ok(self, result, *args, **kwargs):
"""Print a json representation of the result.
Also, store the result in an instance attribute for retrieval later
"""
host = result._host
self.host_ok[host.get_name()] = result
def v2_runner_on_failed(self, result, *args, **kwargs):
host = result._host
self.host_failed[host.get_name()] = result
def pull_ansible_information(host_list: list):
# since the API is constructed for CLI it expects certain options to always be set in the context object
context.CLIARGS = ImmutableDict(connection='smart', module_path=['/usr/share/ansible'], forks=10, become=None,
become_method=None, become_user=None, check=False, diff=False, verbosity=0)
# initialize needed objects
loader = DataLoader() # Takes care of finding and reading yaml, json and ini files
# Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets
results_callback = ResultsCollectorJSONCallback()
# create inventory, use path to host config file as source or hosts in a comma separated string
inventory = InventoryManager(loader=loader, sources=args.inventory)
# variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
variable_manager = VariableManager(loader=loader, inventory=inventory)
# instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
# IMPORTANT: This also adds library dirs paths to the module loader
# IMPORTANT: and so it must be initialized before calling `Play.load()`.
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
passwords=None,
loader=loader,
stdout_callback=results_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
)
# create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
# use set_fact to pull ansible connection information
play_source = dict(
name="Ansible Play",
hosts=host_list,
gather_facts='no',
tasks=[
dict(action=dict(module='set_fact', ip="{{ ansible_host }}" user="{{ remote_user }}",
ssh_key="{{ ansible_ssh_private_key_file }}"), register='output'),
]
)
# Create play object, playbook objects use .load instead of init or new methods,
# this will also automatically create the task objects from the info provided in play_source
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
# Actually run it
try:
result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
finally:
# we always need to cleanup child procs and the structures we use to communicate with them
tqm.cleanup()
if loader:
loader.cleanup_all_tmp_files()
# Remove ansible tmpdir
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
failed = False
connection_dict = {}
for host, result in results_callback.host_failed.items():
print('{0} >>> {1}'.format(host, result._result['msg']))
failed = True
for host, result in results_callback.host_unreachable.items():
print('{0} >>> {1}'.format(host, result._result['msg']))
failed = True
if failed:
sys.exit(1)
connection_dict['connections'] = {}
for host, result in results_callback.host_ok.items():
connection_dict['connections'][result._result['ansible_facts']['ip']] = {
'ssh_key': result._result['ansible_facts']['ssh_key'],
'host': host
}
connection_dict['ssh_key'] = result._result['ansible_facts']['ssh_key']
return connection_dict
def main():
test_targets = args.tests or ["tests/"]
tests = []
for t in test_targets:
for root, subdirs, files in os.walk(t, followlinks=True):
for f in files:
if re.match(r'^test_.*\.py$', f):
tests.append(os.path.join(root, f))
print("Running tests ======")
print("\n".join(tests))
for t in tests:
im = __import__(remove_suffix(t.split('/')[-1], '.py'))
groups = im.ANSIBLE_GROUPS # constant variable in the test
remote_user = im.ANSIBLE_REMOTE_USER # constant variable in the test
connection_dict = pull_ansible_information(groups)
pytest_host_string = ""
for k, value in connection_dict['connections'].items():
pytest_host_string += f"{remote_user}@{k},"
pytest_host_string = remove_suffix(pytest_host_string, ",")
command = [
"py.test",
f"--hosts='{pytest_host_string}'",
"--ssh-identity-file",
os.path.expanduser(connection_dict['ssh_key']),
"--sudo",
"--color=yes",
"--code-highlight=yes",
"--connection",
"paramiko",
]
if args.verbose:
command += [
"-v",
"-s"
]
if args.junit_xml:
command += [
"--junit-xml",
args.junit_xml
]
if args.parallel:
command += [
"-n",
str(args.parallel)
]
command += [
t
]
print(' '.join(command))
subprocess.run(' '.join(command), shell=True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment