Skip to content

Instantly share code, notes, and snippets.

@mohag
Created May 7, 2024 14:29
Show Gist options
  • Save mohag/eac4eeb5a4e6977df1b5a73375978696 to your computer and use it in GitHub Desktop.
Save mohag/eac4eeb5a4e6977df1b5a73375978696 to your computer and use it in GitHub Desktop.
AWS EKS inventory plugin for Ansible
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Derived from the aws_rds inventory plugin
DOCUMENTATION = r"""
name: aws_eks
short_description: EKS cluster inventory source
description:
- Get clusters from Amazon Web Services EKS.
- Uses a YAML configuration file that ends with aws_eks.(yml|yaml).
options:
regions:
description:
- A list of AWS regions in which to describe EKS clusters.
default: []
strict_permissions:
description:
- By default if an AccessDenied exception is encountered this plugin will fail. You can set strict_permissions to
False in the inventory config file which will allow the restrictions to be gracefully skipped.
type: bool
default: True
clusters_to_include:
description:
- Which clusters to include in the inventory. Set to ['all'] as a shorthand to include everything. (if blank, external clusters are not included)
See the "include" option in the U(https://docs.aws.amazon.com/eks/latest/APIReference/API_ListClusters.html#API_ListClusters_RequestSyntax)
API for possible values.
type: list
elements: str
default:
- all
statuses:
description:
- A list of desired states for instances/clusters to be added to inventory. Set to ['all'] as a shorthand to find everything.
See U(https://docs.aws.amazon.com/eks/latest/APIReference/API_Cluster.html#AmazonEKS-Type-Cluster-status) for possible statuses.
type: list
elements: str
default:
- all
hostvars_prefix:
description:
- The prefix for host variables names coming from AWS.
type: str
hostvars_suffix:
description:
- The suffix for host variables names coming from AWS.
type: str
notes:
- Ansible versions prior to 2.10 should use the fully qualified plugin name 'community.aws.aws_eks'.
extends_documentation_fragment:
- inventory_cache
- constructed
- amazon.aws.boto3
- amazon.aws.common.plugins
- amazon.aws.region.plugins
- amazon.aws.assume_role.plugins
author:
- Gert van den Berg (@mohag)
"""
EXAMPLES = r"""
plugin: aws_eks
regions:
- af-south-1
- eu-central-1
- eu-north-1
- us-east-1
keyed_groups:
- key: tags
prefix: tag
- key: region
prefix: region
- key: status
prefix: status
hostvars_prefix: aws_
hostvars_suffix: _eks
"""
try:
import botocore
except ImportError:
pass # will be captured by imported HAS_BOTO3
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_native
from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict
from ansible_collections.amazon.aws.plugins.module_utils.botocore import is_boto3_error_code
from ansible_collections.amazon.aws.plugins.plugin_utils.inventory import AWSInventoryBase
def _find_clusters_with_valid_statuses(clusters, statuses):
if "all" in statuses:
return clusters
valid_clusters = []
for cluster in clusters:
if cluster.get("status") in statuses:
valid_clusters.append(cluster)
return valid_clusters
def describe_resource_with_tags(func):
def describe_wrapper(connection, region, include, strict=False):
try:
results = func(connection=connection, region=region, include=include)
except is_boto3_error_code("AccessDenied") as e: # pylint: disable=duplicate-except
if not strict:
return []
raise AnsibleError(f"Failed to query EKS: {to_native(e)}")
except (
botocore.exceptions.BotoCoreError,
botocore.exceptions.ClientError,
) as e: # pylint: disable=duplicate-except
raise AnsibleError(f"Failed to query EKS: {to_native(e)}")
return results
return describe_wrapper
@describe_resource_with_tags
def _describe_eks_clusters(connection, region, include):
paginator = connection.get_paginator("list_clusters")
eks_clusters_list = paginator.paginate(include=include).build_full_result()
eks_clusters = []
for cluster in eks_clusters_list["clusters"]:
cluster_info = connection.describe_cluster(name=cluster)
# Add the region for grouping later (we don't have an easy field to extract it from)
cluster_info["cluster"]["region"] = region
# TODO: If filters is implemented, they likely need to be checked here. EKS APIs does not have a filter option
eks_clusters.append(cluster_info["cluster"])
return eks_clusters
class InventoryModule(AWSInventoryBase):
NAME = "community.aws.aws_eks"
INVENTORY_FILE_SUFFIXES = ("aws_eks.yml", "aws_eks.yaml")
def __init__(self):
super().__init__()
self.credentials = {}
def _populate(self, hosts):
group = "aws_eks"
self.inventory.add_group(group)
if hosts:
self._add_hosts(hosts=hosts, group=group)
self.inventory.add_child("all", group)
# This is for caching
def _populate_from_source(self, source_data):
hostvars = source_data.pop("_meta", {}).get("hostvars", {})
for group in source_data:
if group == "all":
continue
self.inventory.add_group(group)
hosts = source_data[group].get("hosts", [])
for host in hosts:
self._populate_host_vars([host], hostvars.get(host, {}), group)
self.inventory.add_child("all", group)
def _format_inventory(self, hosts):
results = {"_meta": {"hostvars": {}}}
group = "aws_eks"
results[group] = {"hosts": []}
for host in hosts:
hostname = host["name"]
results[group]["hosts"].append(hostname)
h = self.inventory.get_host(hostname)
results["_meta"]["hostvars"][h.name] = h.vars
return results
def _add_hosts(self, hosts, group):
"""
:param hosts: a list of hosts to be added to a group
:param group: the name of the group to which the hosts belong
"""
for host in hosts:
hostname = host["name"]
host = camel_dict_to_snake_dict(host, ignore_list=["tags"])
self.inventory.add_host(hostname, group=group)
hostvars_prefix = self.get_option("hostvars_prefix")
hostvars_suffix = self.get_option("hostvars_suffix")
new_vars = dict()
for hostvar, hostval in host.items():
if hostvars_prefix:
hostvar = hostvars_prefix + hostvar
if hostvars_suffix:
hostvar = hostvar + hostvars_suffix
new_vars[hostvar] = hostval
self.inventory.set_variable(hostname, hostvar, hostval)
host.update(new_vars)
# Use constructed if applicable
strict = self.get_option("strict")
# Composed variables
self._set_composite_vars(self.get_option("compose"), host, hostname, strict=strict)
# Complex groups based on jinja2 conditionals, hosts that meet the conditional are added to group
self._add_host_to_composed_groups(self.get_option("groups"), host, hostname, strict=strict)
# Create groups based on variable values and add the corresponding hosts to it
self._add_host_to_keyed_groups(self.get_option("keyed_groups"), host, hostname, strict=strict)
def _get_all_eks_clusters(self, strict, statuses, include):
"""
:param regions: a list of regions in which to describe EKS clusters
:param strict: a boolean determining whether to fail or ignore 403 error codes
:param statuses: a list of statuses that the returned clusters should match
:return A list of cluster dictionaries
"""
all_clusters = []
for connection, _region in self.all_clients("eks"):
all_clusters += _describe_eks_clusters(connection, _region, include=include, strict=strict)
sorted_clusters = list(
sorted(all_clusters, key=lambda x: x["name"])
)
return _find_clusters_with_valid_statuses(sorted_clusters, statuses)
def parse(self, inventory, loader, path, cache=True):
super().parse(inventory, loader, path, cache=cache)
# get user specifications (and defaults)
strict_permissions = self.get_option("strict_permissions")
statuses = self.get_option("statuses")
include = self.get_option("clusters_to_include")
result_was_cached, cached_result = self.get_cached_result(path, cache)
if result_was_cached:
self._populate_from_source(cached_result)
return
results = self._get_all_eks_clusters(
strict_permissions,
statuses,
include,
)
self._populate(results)
# Update the cache once we're done
formatted_inventory = self._format_inventory(results)
self.update_cached_result(path, cache, formatted_inventory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment