Skip to content

Instantly share code, notes, and snippets.

@daihu
Forked from philpep/ansible_v2.py
Created April 18, 2018 09:45
Show Gist options
  • Save daihu/a2fe3d0a805656c51c169b45457b0cda to your computer and use it in GitHub Desktop.
Save daihu/a2fe3d0a805656c51c169b45457b0cda to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import print_function
import ansible.executor.task_queue_manager
import ansible.inventory
import ansible.parsing.dataloader
import ansible.playbook.play
import ansible.plugins.callback
import ansible.vars
class Options(object):
def __init__(self, check=True):
self.connection = "smart"
self.module_path = None
self.forks = None
self.remote_user = None
self.private_key_file = None
self.ssh_common_args = None
self.ssh_extra_args = None
self.sftp_extra_args = None
self.scp_extra_args = None
self.become = None
self.become_method = None
self.become_user = None
self.verbosity = None
self.check = check
super(Options, self).__init__()
class Callback(ansible.plugins.callback.CallbackBase):
def __init__(self):
self.unreachable = {}
self.contacted = {}
def runner_on_ok(self, host, result):
self.contacted[host] = {
"success": True,
"result": result,
}
def runner_on_failed(self, host, result, ignore_errors=False):
self.contacted[host] = {
"success": False,
"result": result,
}
def runner_on_unreachable(self, host, result):
self.unreachable[host] = result
class AnsibleRunnerV2(object):
def __init__(self, host_list=None):
self.variable_manager = ansible.vars.VariableManager()
self.loader = ansible.parsing.dataloader.DataLoader()
self.inventory = ansible.inventory.Inventory(
loader=self.loader,
variable_manager=self.variable_manager,
host_list=host_list,
)
self.variable_manager.set_inventory(self.inventory)
super(AnsibleRunnerV2, self).__init__()
def get_hosts(self, pattern=None):
return [
e.name for e in self.inventory.get_hosts(pattern=pattern or "all")
]
def run(self, host, module_name, module_args, **kwargs):
play = ansible.playbook.play.Play().load({
"hosts": host,
"gather_facts": "no",
"tasks": [{
"action": {"module": module_name, "args": module_args},
}],
}, variable_manager=self.variable_manager, loader=self.loader)
tqm = None
options = Options(check=False)
callback = Callback()
try:
tqm = ansible.executor.task_queue_manager.TaskQueueManager(
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
options=options,
passwords=None,
stdout_callback=callback,
)
tqm.run(play)
finally:
if tqm is not None:
tqm.cleanup()
return callback.contacted[host]["result"]
print(AnsibleRunnerV2("staging").run("host", "shell", "hostname"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment