Skip to content

Instantly share code, notes, and snippets.

@pwyliu
Last active October 24, 2021 21:39
Show Gist options
  • Save pwyliu/dc5ee5773dace89ba6c35cc58f9291d6 to your computer and use it in GitHub Desktop.
Save pwyliu/dc5ee5773dace89ba6c35cc58f9291d6 to your computer and use it in GitHub Desktop.
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import operator
import optparse
import os
import sys
import time
import yaml
import re
import getpass
import subprocess
from abc import ABCMeta, abstractmethod
from ansible.release import __version__
from ansible import constants as C
from ansible.compat.six import with_metaclass
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils._text import to_bytes, to_text
from ansible.utils.path import unfrackpath
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
class SortedOptParser(optparse.OptionParser):
'''Optparser which sorts the options by opt before outputting --help'''
#FIXME: epilog parsing: OptionParser.format_epilog = lambda self, formatter: self.epilog
def format_help(self, formatter=None, epilog=None):
self.option_list.sort(key=operator.methodcaller('get_opt_string'))
return optparse.OptionParser.format_help(self, formatter=None)
# Note: Inherit from SortedOptParser so that we get our format_help method
class InvalidOptsParser(SortedOptParser):
'''Ignore invalid options.
Meant for the special case where we need to take care of help and version
but may not know the full range of options yet. (See it in use in set_action)
'''
def __init__(self, parser):
# Since this is special purposed to just handle help and version, we
# take a pre-existing option parser here and set our options from
# that. This allows us to give accurate help based on the given
# option parser.
SortedOptParser.__init__(self, usage=parser.usage,
option_list=parser.option_list,
option_class=parser.option_class,
conflict_handler=parser.conflict_handler,
description=parser.description,
formatter=parser.formatter,
add_help_option=False,
prog=parser.prog,
epilog=parser.epilog)
self.version=parser.version
def _process_long_opt(self, rargs, values):
try:
optparse.OptionParser._process_long_opt(self, rargs, values)
except optparse.BadOptionError:
pass
def _process_short_opts(self, rargs, values):
try:
optparse.OptionParser._process_short_opts(self, rargs, values)
except optparse.BadOptionError:
pass
class CLI(with_metaclass(ABCMeta, object)):
''' code behind bin/ansible* programs '''
VALID_ACTIONS = ['No Actions']
_ITALIC = re.compile(r"I\(([^)]+)\)")
_BOLD = re.compile(r"B\(([^)]+)\)")
_MODULE = re.compile(r"M\(([^)]+)\)")
_URL = re.compile(r"U\(([^)]+)\)")
_CONST = re.compile(r"C\(([^)]+)\)")
PAGER = 'less'
# -F (quit-if-one-screen) -R (allow raw ansi control chars)
# -S (chop long lines) -X (disable termcap init and de-init)
LESS_OPTS = 'FRSX'
def __init__(self, args, callback=None):
"""
Base init method for all command line programs
"""
self.args = args
self.options = None
self.parser = None
self.action = None
self.callback = callback
def set_action(self):
"""
Get the action the user wants to execute from the sys argv list.
"""
for i in range(0, len(self.args)):
arg = self.args[i]
if arg in self.VALID_ACTIONS:
self.action = arg
del self.args[i]
break
if not self.action:
# if we're asked for help or version, we don't need an action.
# have to use a special purpose Option Parser to figure that out as
# the standard OptionParser throws an error for unknown options and
# without knowing action, we only know of a subset of the options
# that could be legal for this command
tmp_parser = InvalidOptsParser(self.parser)
tmp_options, tmp_args = tmp_parser.parse_args(self.args)
if not(hasattr(tmp_options, 'help') and tmp_options.help) or (hasattr(tmp_options, 'version') and tmp_options.version):
raise AnsibleOptionsError("Missing required action")
def execute(self):
"""
Actually runs a child defined method using the execute_<action> pattern
"""
fn = getattr(self, "execute_%s" % self.action)
fn()
@abstractmethod
def run(self):
"""Run the ansible command
Subclasses must implement this method. It does the actual work of
running an Ansible command.
"""
if self.options.verbosity > 0:
if C.CONFIG_FILE:
display.display(u"Using %s as config file" % to_text(C.CONFIG_FILE))
else:
display.display(u"No config file found; using defaults")
display.display(u"HELLO DEBUG")
@staticmethod
def ask_vault_passwords():
''' prompt for vault password and/or password change '''
vault_pass = None
try:
vault_pass = getpass.getpass(prompt="Vault password: ")
except EOFError:
pass
# enforce no newline chars at the end of passwords
if vault_pass:
vault_pass = to_bytes(vault_pass, errors='surrogate_or_strict', nonstring='simplerepr').strip()
return vault_pass
@staticmethod
def ask_new_vault_passwords():
new_vault_pass = None
try:
new_vault_pass = getpass.getpass(prompt="New Vault password: ")
new_vault_pass2 = getpass.getpass(prompt="Confirm New Vault password: ")
if new_vault_pass != new_vault_pass2:
raise AnsibleError("Passwords do not match")
except EOFError:
pass
if new_vault_pass:
new_vault_pass = to_bytes(new_vault_pass, errors='surrogate_or_strict', nonstring='simplerepr').strip()
return new_vault_pass
def ask_passwords(self):
''' prompt for connection and become passwords if needed '''
op = self.options
sshpass = None
becomepass = None
become_prompt = ''
try:
if op.ask_pass:
sshpass = getpass.getpass(prompt="SSH password: ")
become_prompt = "%s password[defaults to SSH password]: " % op.become_method.upper()
if sshpass:
sshpass = to_bytes(sshpass, errors='strict', nonstring='simplerepr')
else:
become_prompt = "%s password: " % op.become_method.upper()
if op.become_ask_pass:
becomepass = getpass.getpass(prompt=become_prompt)
if op.ask_pass and becomepass == '':
becomepass = sshpass
if becomepass:
becomepass = to_bytes(becomepass)
except EOFError:
pass
return (sshpass, becomepass)
def normalize_become_options(self):
''' this keeps backwards compatibility with sudo/su self.options '''
self.options.become_ask_pass = self.options.become_ask_pass or self.options.ask_sudo_pass or self.options.ask_su_pass or C.DEFAULT_BECOME_ASK_PASS
self.options.become_user = self.options.become_user or self.options.sudo_user or self.options.su_user or C.DEFAULT_BECOME_USER
if self.options.become:
pass
elif self.options.sudo:
self.options.become = True
self.options.become_method = 'sudo'
elif self.options.su:
self.options.become = True
self.options.become_method = 'su'
def validate_conflicts(self, vault_opts=False, runas_opts=False, fork_opts=False):
''' check for conflicting options '''
op = self.options
if vault_opts:
# Check for vault related conflicts
if (op.ask_vault_pass and op.vault_password_file):
self.parser.error("--ask-vault-pass and --vault-password-file are mutually exclusive")
if runas_opts:
# Check for privilege escalation conflicts
if (op.su or op.su_user) and (op.sudo or op.sudo_user) or \
(op.su or op.su_user) and (op.become or op.become_user) or \
(op.sudo or op.sudo_user) and (op.become or op.become_user):
self.parser.error("Sudo arguments ('--sudo', '--sudo-user', and '--ask-sudo-pass') "
"and su arguments ('-su', '--su-user', and '--ask-su-pass') "
"and become arguments ('--become', '--become-user', and '--ask-become-pass')"
" are exclusive of each other")
if fork_opts:
if op.forks < 1:
self.parser.error("The number of processes (--forks) must be >= 1")
@staticmethod
def expand_tilde(option, opt, value, parser):
setattr(parser.values, option.dest, os.path.expanduser(value))
@staticmethod
def unfrack_path(option, opt, value, parser):
setattr(parser.values, option.dest, unfrackpath(value))
@staticmethod
def expand_paths(option, opt, value, parser):
"""optparse action callback to convert a PATH style string arg to a list of path strings.
For ex, cli arg of '-p /blip/foo:/foo/bar' would be split on the
default os.pathsep and the option value would be set to
the list ['/blip/foo', '/foo/bar']. Each path string in the list
will also have '~/' values expand via os.path.expanduser()."""
path_entries = value.split(os.pathsep)
expanded_path_entries = [os.path.expanduser(path_entry) for path_entry in path_entries]
setattr(parser.values, option.dest, expanded_path_entries)
@staticmethod
def base_parser(usage="", output_opts=False, runas_opts=False, meta_opts=False, runtask_opts=False, vault_opts=False, module_opts=False,
async_opts=False, connect_opts=False, subset_opts=False, check_opts=False, inventory_opts=False, epilog=None, fork_opts=False, runas_prompt_opts=False):
''' create an options parser for most ansible scripts '''
# TODO: implement epilog parsing
# OptionParser.format_epilog = lambda self, formatter: self.epilog
# base opts
parser = SortedOptParser(usage, version=CLI.version("%prog"))
parser.add_option('-v','--verbose', dest='verbosity', default=C.DEFAULT_VERBOSITY, action="count",
help="verbose mode (-vvv for more, -vvvv to enable connection debugging)")
if inventory_opts:
parser.add_option('-i', '--inventory-file', dest='inventory',
help="specify inventory host path (default=%s) or comma separated host list." % C.DEFAULT_HOST_LIST,
default=C.DEFAULT_HOST_LIST, action="callback", callback=CLI.expand_tilde, type=str)
parser.add_option('--list-hosts', dest='listhosts', action='store_true',
help='outputs a list of matching hosts; does not execute anything else')
parser.add_option('-l', '--limit', default=C.DEFAULT_SUBSET, dest='subset',
help='further limit selected hosts to an additional pattern')
if module_opts:
parser.add_option('-M', '--module-path', dest='module_path', default=None,
help="specify path(s) to module library (default=%s)" % C.DEFAULT_MODULE_PATH,
action="callback", callback=CLI.expand_tilde, type=str)
if runtask_opts:
parser.add_option('-e', '--extra-vars', dest="extra_vars", action="append",
help="set additional variables as key=value or YAML/JSON", default=[])
if fork_opts:
parser.add_option('-f','--forks', dest='forks', default=C.DEFAULT_FORKS, type='int',
help="specify number of parallel processes to use (default=%s)" % C.DEFAULT_FORKS)
if vault_opts:
parser.add_option('--ask-vault-pass', default=C.DEFAULT_ASK_VAULT_PASS, dest='ask_vault_pass', action='store_true',
help='ask for vault password')
parser.add_option('--vault-password-file', default=C.DEFAULT_VAULT_PASSWORD_FILE, dest='vault_password_file',
help="vault password file", action="callback", callback=CLI.expand_tilde, type=str)
parser.add_option('--new-vault-password-file', dest='new_vault_password_file',
help="new vault password file for rekey", action="callback", callback=CLI.expand_tilde, type=str)
parser.add_option('--output', default=None, dest='output_file',
help='output file name for encrypt or decrypt; use - for stdout',
action="callback", callback=CLI.expand_tilde, type=str)
if subset_opts:
parser.add_option('-t', '--tags', dest='tags', default=[], action='append',
help="only run plays and tasks tagged with these values")
parser.add_option('--skip-tags', dest='skip_tags', default=[], action='append',
help="only run plays and tasks whose tags do not match these values")
if output_opts:
parser.add_option('-o', '--one-line', dest='one_line', action='store_true',
help='condense output')
parser.add_option('-t', '--tree', dest='tree', default=None,
help='log output to this directory')
if connect_opts:
connect_group = optparse.OptionGroup(parser, "Connection Options", "control as whom and how to connect to hosts")
connect_group.add_option('-k', '--ask-pass', default=C.DEFAULT_ASK_PASS, dest='ask_pass', action='store_true',
help='ask for connection password')
connect_group.add_option('--private-key','--key-file', default=C.DEFAULT_PRIVATE_KEY_FILE, dest='private_key_file',
help='use this file to authenticate the connection',
action="callback", callback=CLI.unfrack_path, type=str)
connect_group.add_option('-u', '--user', default=C.DEFAULT_REMOTE_USER, dest='remote_user',
help='connect as this user (default=%s)' % C.DEFAULT_REMOTE_USER)
connect_group.add_option('-c', '--connection', dest='connection', default=C.DEFAULT_TRANSPORT,
help="connection type to use (default=%s)" % C.DEFAULT_TRANSPORT)
connect_group.add_option('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type='int', dest='timeout',
help="override the connection timeout in seconds (default=%s)" % C.DEFAULT_TIMEOUT)
connect_group.add_option('--ssh-common-args', default='', dest='ssh_common_args',
help="specify common arguments to pass to sftp/scp/ssh (e.g. ProxyCommand)")
connect_group.add_option('--sftp-extra-args', default='', dest='sftp_extra_args',
help="specify extra arguments to pass to sftp only (e.g. -f, -l)")
connect_group.add_option('--scp-extra-args', default='', dest='scp_extra_args',
help="specify extra arguments to pass to scp only (e.g. -l)")
connect_group.add_option('--ssh-extra-args', default='', dest='ssh_extra_args',
help="specify extra arguments to pass to ssh only (e.g. -R)")
parser.add_option_group(connect_group)
runas_group = None
rg = optparse.OptionGroup(parser, "Privilege Escalation Options", "control how and which user you become as on target hosts")
if runas_opts:
runas_group = rg
# priv user defaults to root later on to enable detecting when this option was given here
runas_group.add_option("-s", "--sudo", default=C.DEFAULT_SUDO, action="store_true", dest='sudo',
help="run operations with sudo (nopasswd) (deprecated, use become)")
runas_group.add_option('-U', '--sudo-user', dest='sudo_user', default=None,
help='desired sudo user (default=root) (deprecated, use become)')
runas_group.add_option('-S', '--su', default=C.DEFAULT_SU, action='store_true',
help='run operations with su (deprecated, use become)')
runas_group.add_option('-R', '--su-user', default=None,
help='run operations with su as this user (default=%s) (deprecated, use become)' % C.DEFAULT_SU_USER)
# consolidated privilege escalation (become)
runas_group.add_option("-b", "--become", default=C.DEFAULT_BECOME, action="store_true", dest='become',
help="run operations with become (does not imply password prompting)")
runas_group.add_option('--become-method', dest='become_method', default=C.DEFAULT_BECOME_METHOD, type='choice', choices=C.BECOME_METHODS,
help="privilege escalation method to use (default=%s), valid choices: [ %s ]" % (C.DEFAULT_BECOME_METHOD, ' | '.join(C.BECOME_METHODS)))
runas_group.add_option('--become-user', default=None, dest='become_user', type='string',
help='run operations as this user (default=%s)' % C.DEFAULT_BECOME_USER)
if runas_opts or runas_prompt_opts:
if not runas_group:
runas_group = rg
runas_group.add_option('--ask-sudo-pass', default=C.DEFAULT_ASK_SUDO_PASS, dest='ask_sudo_pass', action='store_true',
help='ask for sudo password (deprecated, use become)')
runas_group.add_option('--ask-su-pass', default=C.DEFAULT_ASK_SU_PASS, dest='ask_su_pass', action='store_true',
help='ask for su password (deprecated, use become)')
runas_group.add_option('-K', '--ask-become-pass', default=False, dest='become_ask_pass', action='store_true',
help='ask for privilege escalation password')
if runas_group:
parser.add_option_group(runas_group)
if async_opts:
parser.add_option('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type='int', dest='poll_interval',
help="set the poll interval if using -B (default=%s)" % C.DEFAULT_POLL_INTERVAL)
parser.add_option('-B', '--background', dest='seconds', type='int', default=0,
help='run asynchronously, failing after X seconds (default=N/A)')
if check_opts:
parser.add_option("-C", "--check", default=False, dest='check', action='store_true',
help="don't make any changes; instead, try to predict some of the changes that may occur")
parser.add_option('--syntax-check', dest='syntax', action='store_true',
help="perform a syntax check on the playbook, but do not execute it")
parser.add_option("-D", "--diff", default=False, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those files; works great with --check")
if meta_opts:
parser.add_option('--force-handlers', default=C.DEFAULT_FORCE_HANDLERS, dest='force_handlers', action='store_true',
help="run handlers even if a task fails")
parser.add_option('--flush-cache', dest='flush_cache', action='store_true',
help="clear the fact cache")
return parser
@abstractmethod
def parse(self):
"""Parse the command line args
This method parses the command line arguments. It uses the parser
stored in the self.parser attribute and saves the args and options in
self.args and self.options respectively.
Subclasses need to implement this method. They will usually create
a base_parser, add their own options to the base_parser, and then call
this method to do the actual parsing. An implementation will look
something like this::
def parse(self):
parser = super(MyCLI, self).base_parser(usage="My Ansible CLI", inventory_opts=True)
parser.add_option('--my-option', dest='my_option', action='store')
self.parser = parser
super(MyCLI, self).parse()
# If some additional transformations are needed for the
# arguments and options, do it here.
"""
self.options, self.args = self.parser.parse_args(self.args[1:])
if hasattr(self.options, 'tags') and not self.options.tags:
# optparse defaults does not do what's expected
self.options.tags = ['all']
if hasattr(self.options, 'tags') and self.options.tags:
if not C.MERGE_MULTIPLE_CLI_TAGS:
if len(self.options.tags) > 1:
display.deprecated('Specifying --tags multiple times on the command line currently uses the last specified value. In 2.4, values will be merged instead. Set merge_multiple_cli_tags=True in ansible.cfg to get this behavior now.', version=2.5, removed=False)
self.options.tags = [self.options.tags[-1]]
tags = set()
for tag_set in self.options.tags:
for tag in tag_set.split(u','):
tags.add(tag.strip())
self.options.tags = list(tags)
if hasattr(self.options, 'skip_tags') and self.options.skip_tags:
if not C.MERGE_MULTIPLE_CLI_TAGS:
if len(self.options.skip_tags) > 1:
display.deprecated('Specifying --skip-tags multiple times on the command line currently uses the last specified value. In 2.4, values will be merged instead. Set merge_multiple_cli_tags=True in ansible.cfg to get this behavior now.', version=2.5, removed=False)
self.options.skip_tags = [self.options.skip_tags[-1]]
skip_tags = set()
for tag_set in self.options.skip_tags:
for tag in tag_set.split(u','):
skip_tags.add(tag.strip())
self.options.skip_tags = list(skip_tags)
@staticmethod
def version(prog):
''' return ansible version '''
result = "{0} {1}".format(prog, __version__)
gitinfo = CLI._gitinfo()
if gitinfo:
result = result + " {0}".format(gitinfo)
result += "\n config file = %s" % C.CONFIG_FILE
if C.DEFAULT_MODULE_PATH is None:
cpath = "Default w/o overrides"
else:
cpath = C.DEFAULT_MODULE_PATH
result = result + "\n configured module search path = %s" % cpath
result = result + "\n python version = %s" % ''.join(sys.version.splitlines())
return result
@staticmethod
def version_info(gitinfo=False):
''' return full ansible version info '''
if gitinfo:
# expensive call, user with care
ansible_version_string = CLI.version('')
else:
ansible_version_string = __version__
ansible_version = ansible_version_string.split()[0]
ansible_versions = ansible_version.split('.')
for counter in range(len(ansible_versions)):
if ansible_versions[counter] == "":
ansible_versions[counter] = 0
try:
ansible_versions[counter] = int(ansible_versions[counter])
except:
pass
if len(ansible_versions) < 3:
for counter in range(len(ansible_versions), 3):
ansible_versions.append(0)
return {'string': ansible_version_string.strip(),
'full': ansible_version,
'major': ansible_versions[0],
'minor': ansible_versions[1],
'revision': ansible_versions[2]}
@staticmethod
def _git_repo_info(repo_path):
''' returns a string containing git branch, commit id and commit date '''
result = None
if os.path.exists(repo_path):
# Check if the .git is a file. If it is a file, it means that we are in a submodule structure.
if os.path.isfile(repo_path):
try:
gitdir = yaml.safe_load(open(repo_path)).get('gitdir')
# There is a possibility the .git file to have an absolute path.
if os.path.isabs(gitdir):
repo_path = gitdir
else:
repo_path = os.path.join(repo_path[:-4], gitdir)
except (IOError, AttributeError):
return ''
f = open(os.path.join(repo_path, "HEAD"))
line = f.readline().rstrip("\n")
if line.startswith("ref:"):
branch_path = os.path.join(repo_path, line[5:])
else:
branch_path = None
f.close()
if branch_path and os.path.exists(branch_path):
branch = '/'.join(line.split('/')[2:])
f = open(branch_path)
commit = f.readline()[:10]
f.close()
else:
# detached HEAD
commit = line[:10]
branch = 'detached HEAD'
branch_path = os.path.join(repo_path, "HEAD")
date = time.localtime(os.stat(branch_path).st_mtime)
if time.daylight == 0:
offset = time.timezone
else:
offset = time.altzone
result = "({0} {1}) last updated {2} (GMT {3:+04d})".format(branch, commit,
time.strftime("%Y/%m/%d %H:%M:%S", date), int(offset / -36))
else:
result = ''
return result
@staticmethod
def _gitinfo():
basedir = os.path.join(os.path.dirname(__file__), '..', '..', '..')
repo_path = os.path.join(basedir, '.git')
result = CLI._git_repo_info(repo_path)
submodules = os.path.join(basedir, '.gitmodules')
if not os.path.exists(submodules):
return result
f = open(submodules)
for line in f:
tokens = line.strip().split(' ')
if tokens[0] == 'path':
submodule_path = tokens[2]
submodule_info = CLI._git_repo_info(os.path.join(basedir, submodule_path, '.git'))
if not submodule_info:
submodule_info = ' not found - use git submodule update --init ' + submodule_path
result += "\n {0}: {1}".format(submodule_path, submodule_info)
f.close()
return result
def pager(self, text):
''' find reasonable way to display text '''
# this is a much simpler form of what is in pydoc.py
if not sys.stdout.isatty():
display.display(text)
elif 'PAGER' in os.environ:
if sys.platform == 'win32':
display.display(text)
else:
self.pager_pipe(text, os.environ['PAGER'])
else:
p = subprocess.Popen('less --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.communicate()
if p.returncode == 0:
self.pager_pipe(text, 'less')
else:
display.display(text)
@staticmethod
def pager_pipe(text, cmd):
''' pipe text through a pager '''
if 'LESS' not in os.environ:
os.environ['LESS'] = CLI.LESS_OPTS
try:
cmd = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=sys.stdout)
cmd.communicate(input=to_bytes(text))
except IOError:
pass
except KeyboardInterrupt:
pass
@classmethod
def tty_ify(cls, text):
t = cls._ITALIC.sub("`" + r"\1" + "'", text) # I(word) => `word'
t = cls._BOLD.sub("*" + r"\1" + "*", t) # B(word) => *word*
t = cls._MODULE.sub("[" + r"\1" + "]", t) # M(word) => [word]
t = cls._URL.sub(r"\1", t) # U(word) => word
t = cls._CONST.sub("`" + r"\1" + "'", t) # C(word) => `word'
return t
@staticmethod
def read_vault_password_file(vault_password_file, loader):
"""
Read a vault password from a file or if executable, execute the script and
retrieve password from STDOUT
"""
this_path = os.path.realpath(os.path.expanduser(vault_password_file))
if not os.path.exists(this_path):
raise AnsibleError("The vault password file %s was not found" % this_path)
if loader.is_executable(this_path):
try:
# STDERR not captured to make it easier for users to prompt for input in their scripts
p = subprocess.Popen(this_path, stdout=subprocess.PIPE)
except OSError as e:
raise AnsibleError("Problem running vault password script %s (%s). If this is not a script, remove the executable bit from the file." % (' '.join(this_path), e))
stdout, stderr = p.communicate()
if p.returncode != 0:
raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (this_path, p.returncode, p.stderr))
vault_pass = stdout.strip(b'\r\n')
else:
try:
f = open(this_path, "rb")
vault_pass=f.read().strip()
f.close()
except (OSError, IOError) as e:
raise AnsibleError("Could not read vault password file %s: %s" % (this_path, e))
return vault_pass
def get_opt(self, k, defval=""):
"""
Returns an option from an Optparse values instance.
"""
try:
data = getattr(self.options, k)
except:
return defval
# FIXME: Can this be removed if cli and/or constants ensures it's a
# list?
if k == "roles_path":
if os.pathsep in data:
data = data.split(os.pathsep)[0]
return data
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ansible import constants as C
from ansible.compat.six import string_types
from ansible.errors import AnsibleParserError, AnsibleUndefinedVariable, AnsibleFileNotFound
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
def load_list_of_blocks(ds, play, parent_block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None):
'''
Given a list of mixed task/block data (parsed from YAML),
return a list of Block() objects, where implicit blocks
are created for each bare Task.
'''
# we import here to prevent a circular dependency with imports
from ansible.playbook.block import Block
from ansible.playbook.task_include import TaskInclude
from ansible.playbook.role_include import IncludeRole
assert isinstance(ds, (list, type(None)))
block_list = []
if ds:
for block_ds in ds:
b = Block.load(
block_ds,
play=play,
parent_block=parent_block,
role=role,
task_include=task_include,
use_handlers=use_handlers,
variable_manager=variable_manager,
loader=loader,
)
# Implicit blocks are created by bare tasks listed in a play without
# an explicit block statement. If we have two implicit blocks in a row,
# squash them down to a single block to save processing time later.
if b._implicit and len(block_list) > 0 and block_list[-1]._implicit:
for t in b.block:
if isinstance(t._parent, (TaskInclude, IncludeRole)):
t._parent._parent = block_list[-1]
else:
t._parent = block_list[-1]
block_list[-1].block.extend(b.block)
else:
block_list.append(b)
return block_list
def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None):
'''
Given a list of task datastructures (parsed from YAML),
return a list of Task() or TaskInclude() objects.
'''
# we import here to prevent a circular dependency with imports
from ansible.playbook.block import Block
from ansible.playbook.handler import Handler
from ansible.playbook.task import Task
from ansible.playbook.task_include import TaskInclude
from ansible.playbook.role_include import IncludeRole
from ansible.playbook.handler_task_include import HandlerTaskInclude
from ansible.template import Templar
assert isinstance(ds, list)
display.display(u"DEBUG: Entered {}".format(__name__))
display.display(u"DEBUG: task_ds is {}".format(ds))
task_list = []
for task_ds in ds:
assert isinstance(task_ds, dict)
if 'block' in task_ds:
t = Block.load(
task_ds,
play=play,
parent_block=block,
role=role,
task_include=task_include,
use_handlers=use_handlers,
variable_manager=variable_manager,
loader=loader,
)
task_list.append(t)
else:
if 'include' in task_ds:
if use_handlers:
include_class = HandlerTaskInclude
else:
include_class = TaskInclude
t = include_class.load(
task_ds,
block=block,
role=role,
task_include=None,
variable_manager=variable_manager,
loader=loader
)
all_vars = variable_manager.get_vars(loader=loader, play=play, task=t)
templar = Templar(loader=loader, variables=all_vars)
# check to see if this include is dynamic or static:
# 1. the user has set the 'static' option to false or true
# 2. one of the appropriate config options was set
if t.static is not None:
is_static = t.static
else:
is_static = C.DEFAULT_TASK_INCLUDES_STATIC or \
(use_handlers and C.DEFAULT_HANDLER_INCLUDES_STATIC) or \
(not templar._contains_vars(t.args['_raw_params']) and t.all_parents_static() and not t.loop)
if is_static:
if t.loop is not None:
raise AnsibleParserError("You cannot use 'static' on an include with a loop", obj=task_ds)
# we set a flag to indicate this include was static
t.statically_loaded = True
# handle relative includes by walking up the list of parent include
# tasks and checking the relative result to see if it exists
parent_include = block
cumulative_path = None
found = False
subdir = 'tasks'
if use_handlers:
subdir = 'handlers'
while parent_include is not None:
if not isinstance(parent_include, TaskInclude):
parent_include = parent_include._parent
continue
parent_include_dir = templar.template(os.path.dirname(parent_include.args.get('_raw_params')))
if cumulative_path is None:
cumulative_path = parent_include_dir
elif not os.path.isabs(cumulative_path):
cumulative_path = os.path.join(parent_include_dir, cumulative_path)
include_target = templar.template(t.args['_raw_params'])
if t._role:
new_basedir = os.path.join(t._role._role_path, subdir, cumulative_path)
include_file = loader.path_dwim_relative(new_basedir, subdir, include_target)
else:
include_file = loader.path_dwim_relative(loader.get_basedir(), cumulative_path, include_target)
if os.path.exists(include_file):
found = True
break
else:
parent_include = parent_include._parent
if not found:
try:
include_target = templar.template(t.args['_raw_params'])
except AnsibleUndefinedVariable:
raise AnsibleParserError(
"Error when evaluating variable in include name: %s.\n\n" \
"When using static includes, ensure that any variables used in their names are defined in vars/vars_files\n" \
"or extra-vars passed in from the command line. Static includes cannot use variables from inventory\n" \
"sources like group or host vars." % t.args['_raw_params'],
obj=task_ds,
suppress_extended_error=True,
)
if t._role:
include_file = loader.path_dwim_relative(t._role._role_path, subdir, include_target)
else:
include_file = loader.path_dwim(include_target)
try:
display.display(u"DEBUG: trying to load tasks from disk")
data = loader.load_from_file(include_file)
if data is None:
return []
elif not isinstance(data, list):
raise AnsibleParserError("included task files must contain a list of tasks", obj=data)
# since we can't send callbacks here, we display a message directly in
# the same fashion used by the on_include callback. We also do it here,
# because the recursive nature of helper methods means we may be loading
# nested includes, and we want the include order printed correctly
display.display(u"DEBUG: statically included: %s" % include_file)
display.vv("statically included: %s" % include_file)
except AnsibleFileNotFound:
if t.static or \
C.DEFAULT_TASK_INCLUDES_STATIC or \
C.DEFAULT_HANDLER_INCLUDES_STATIC and use_handlers:
raise
display.deprecated(
"Included file '%s' not found, however since this include is not " \
"explicitly marked as 'static: yes', we will try and include it dynamically " \
"later. In the future, this will be an error unless 'static: no' is used " \
"on the include task. If you do not want missing includes to be considered " \
"dynamic, use 'static: yes' on the include or set the global ansible.cfg " \
"options to make all inclues static for tasks and/or handlers" % include_file,
)
task_list.append(t)
continue
ti_copy = t.copy(exclude_parent=True)
ti_copy._parent = block
included_blocks = load_list_of_blocks(
data,
play=play,
parent_block=None,
task_include=ti_copy,
role=role,
use_handlers=use_handlers,
loader=loader,
variable_manager=variable_manager,
)
# pop tags out of the include args, if they were specified there, and assign
# them to the include. If the include already had tags specified, we raise an
# error so that users know not to specify them both ways
tags = ti_copy.vars.pop('tags', [])
if isinstance(tags, string_types):
tags = tags.split(',')
if len(tags) > 0:
if len(ti_copy.tags) > 0:
raise AnsibleParserError(
"Include tasks should not specify tags in more than one way (both via args and directly on the task). " \
"Mixing styles in which tags are specified is prohibited for whole import hierarchy, not only for single import statement",
obj=task_ds,
suppress_extended_error=True,
)
display.deprecated("You should not specify tags in the include parameters. All tags should be specified using the task-level option")
else:
tags = ti_copy.tags[:]
# now we extend the tags on each of the included blocks
for b in included_blocks:
b.tags = list(set(b.tags).union(tags))
# END FIXME
# FIXME: handlers shouldn't need this special handling, but do
# right now because they don't iterate blocks correctly
if use_handlers:
for b in included_blocks:
task_list.extend(b.block)
else:
task_list.extend(included_blocks)
else:
task_list.append(t)
elif 'include_role' in task_ds:
ir = IncludeRole.load(
task_ds,
block=block,
role=role,
task_include=None,
variable_manager=variable_manager,
loader=loader
)
# 1. the user has set the 'static' option to false or true
# 2. one of the appropriate config options was set
if ir.static is not None:
is_static = ir.static
else:
display.debug('Determine if include_role is static')
# Check to see if this include is dynamic or static:
all_vars = variable_manager.get_vars(loader=loader, play=play, task=ir)
templar = Templar(loader=loader, variables=all_vars)
needs_templating = False
for param in ir.args:
if templar._contains_vars(ir.args[param]):
if not templar.templatable(ir.args[param]):
needs_templating = True
break
is_static = C.DEFAULT_TASK_INCLUDES_STATIC or \
(use_handlers and C.DEFAULT_HANDLER_INCLUDES_STATIC) or \
(not needs_templating and ir.all_parents_static() and not ir.loop)
display.debug('Determined that if include_role static is %s' % str(is_static))
if is_static:
# uses compiled list from object
t = task_list.extend(ir.get_block_list(variable_manager=variable_manager, loader=loader))
else:
# passes task object itself for latter generation of list
t = task_list.append(ir)
else:
if use_handlers:
t = Handler.load(task_ds, block=block, role=role, task_include=task_include, variable_manager=variable_manager, loader=loader)
else:
t = Task.load(task_ds, block=block, role=role, task_include=task_include, variable_manager=variable_manager, loader=loader)
task_list.append(t)
return task_list
def load_list_of_roles(ds, play, current_role_path=None, variable_manager=None, loader=None):
'''
Loads and returns a list of RoleInclude objects from the datastructure
list of role definitions
'''
# we import here to prevent a circular dependency with imports
from ansible.playbook.role.include import RoleInclude
assert isinstance(ds, list)
roles = []
for role_def in ds:
i = RoleInclude.load(role_def, play=play, current_role_path=current_role_path, variable_manager=variable_manager, loader=loader)
roles.append(i)
return roles
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ansible import constants as C
from ansible.errors import AnsibleParserError
from ansible.module_utils._text import to_text
from ansible.playbook.play import Play
from ansible.playbook.playbook_include import PlaybookInclude
from ansible.plugins import get_all_plugin_loaders
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
__all__ = ['Playbook']
class Playbook:
def __init__(self, loader):
# Entries in the datastructure of a playbook may
# be either a play or an include statement
self._entries = []
self._basedir = to_text(os.getcwd(), errors='surrogate_or_strict')
self._loader = loader
self._file_name = None
@staticmethod
def load(file_name, variable_manager=None, loader=None):
pb = Playbook(loader=loader)
pb._load_playbook_data(file_name=file_name, variable_manager=variable_manager)
return pb
def _load_playbook_data(self, file_name, variable_manager):
if os.path.isabs(file_name):
self._basedir = os.path.dirname(file_name)
else:
self._basedir = os.path.normpath(os.path.join(self._basedir, os.path.dirname(file_name)))
# set the loaders basedir
cur_basedir = self._loader.get_basedir()
self._loader.set_basedir(self._basedir)
self._file_name = file_name
# dynamically load any plugins from the playbook directory
for name, obj in get_all_plugin_loaders():
if obj.subdir:
plugin_path = os.path.join(self._basedir, obj.subdir)
if os.path.isdir(plugin_path):
obj.add_directory(plugin_path)
ds = self._loader.load_from_file(os.path.basename(file_name))
if not isinstance(ds, list):
# restore the basedir in case this error is caught and handled
self._loader.set_basedir(cur_basedir)
raise AnsibleParserError("playbooks must be a list of plays", obj=ds)
# Parse the playbook entries. For plays, we simply parse them
# using the Play() object, and includes are parsed using the
# PlaybookInclude() object
for entry in ds:
if not isinstance(entry, dict):
# restore the basedir in case this error is caught and handled
self._loader.set_basedir(cur_basedir)
raise AnsibleParserError("playbook entries must be either a valid play or an include statement", obj=entry)
if 'include' in entry:
pb = PlaybookInclude.load(entry, basedir=self._basedir, variable_manager=variable_manager, loader=self._loader)
if pb is not None:
self._entries.extend(pb._entries)
else:
display.display("skipping playbook include '%s' due to conditional test failure" % entry.get('include', entry), color=C.COLOR_SKIP)
else:
entry_obj = Play.load(entry, variable_manager=variable_manager, loader=self._loader)
self._entries.append(entry_obj)
# we're done, so restore the old basedir in the loader
self._loader.set_basedir(cur_basedir)
def get_loader(self):
display.display(u"DEBUG: Entered {}".format(__name__))
return self._loader
def get_plays(self):
display.display(u"DEBUG: Entered {}".format(__name__))
return self._entries[:]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment