Skip to content

Instantly share code, notes, and snippets.

@pinkeen
Created December 11, 2019 05:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pinkeen/306d51b460511680120ef5c92a080ff8 to your computer and use it in GitHub Desktop.
Save pinkeen/306d51b460511680120ef5c92a080ff8 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import re
import sys
import json
import datetime
import itertools
import traceback
import configparser
import itertools
try:
import boto3
HAS_BOTO3 = True
except ImportError:
print('# This script requires boto3 library!', file=sys.stderr)
sys.exit(5)
ROLE_TAG = 'Role'
ENVIRONMENT_TAG = 'AppName'
FILTERS = {
'instance-state-name': ['running', 'pending'],
}
TAG_FILTERS = {
'Infrastructure': 'mageops',
'Project': 'creativeshop',
'Environment': 'proto'
}
_to_snakecase_re_a = re.compile(r'((?:^|[a-z0-9])[A-Z])([A-Z](?:[a-z0-9]|$))')
_to_snakecase_re_b = re.compile(r'(?:([a-z0-9])([A-Z]))')
_to_safe_name_re = re.compile(r'[^a-z_:]+')
flatten = itertools.chain.from_iterable
def get_groups_for_tag(key, value):
if key == ROLE_TAG: return [value]
elements = split_by_case(key)
if elements[0] == to_safe_name(ROLE_TAG):
return ['_'.join(elements[1:]), value]
return []
def get_groups_for_tags(tags):
return flatten([get_groups_for_tag(k, v) for k, v in tags.items()])
def get_groups_for_instance(instance):
groups = []
if 'tags' in instance:
groups.extend(get_groups_for_tags(instance['tags']))
return map(to_safe_name, groups)
def get_hostname_for_instance(instance):
if 'public_dns_name' in instance:
return instance['public_dns_name']
if 'public_ip_address' in instance:
return instance['public_ip_address']
if 'private_ip_address' in instance:
return instance['private_ip_address']
def remap_keys(object={}, keymap={}):
return { (keymap[key] if key in keymap else key): object[key] for key in object }
def pick_keys(object={}, keys=[]):
return { key: object[key] for key in keys if key in object }
def reject_keys(object={}, keys=[]):
return { key: object[key] for key in object if key not in keys }
def prefix_keys(object={}, prefix=''):
return { prefix + key: object[key] for key in object }
def pick_prefixed_keys(object={}, prefix=''):
return dict([(key, object[key]) for key in object if key.startswith(prefix)])
def dict_to_filters(d):
return [{'Name': k, 'Values': v} for k, v in d.items()]
def tags_to_dict(l):
return {x['key']: x['value'] for x in l if 'key' in x and 'value' in x}
def snakecase(s):
return _to_snakecase_re_b.sub(r'\1_\2',
_to_snakecase_re_a.sub(r'\1_\2', s)).lower()
def split_by_case(s):
return snakecase(s).split('_')
def to_safe_name(s):
return _to_safe_name_re.sub('_', s.lower())
def normalize_keys(d):
if type(d) is dict:
return {to_safe_name(snakecase(k)): normalize_keys(v) for k, v in d.items()}
if type(d) is list or type(d) is tuple:
return [normalize_keys(s) for s in d]
return d
def normalize_instance(instance):
instance = normalize_keys(instance)
if 'tags' in instance:
instance.update({'tags': tags_to_dict(instance['tags'])})
return instance
def get_instances(filters={}):
session = boto3.Session()
paginator = boto3.client('ec2').get_paginator('describe_instances')
pages = paginator.paginate(Filters=dict_to_filters(filters))
instances = flatten([reservation['Instances'] for reservation
in flatten([page['Reservations'] for page in pages])])
return map(normalize_instance, instances)
def serialize(o):
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
def build_inventory(instances):
host_groups = {}
host_vars = {get_hostname_for_instance(instance): instance for instance in instances}
for hostname, instance in hostvars.items():
for group_name in get_groups_for_instance(instance):
if not group_name in host_groups:
host_groups[group_name] = []
host_groups[group_name]['hosts'].append(hostname)
if not 'all' in host_groups:
host_groups['all'] = host_groups.keys()
else
host_groups['']
host_groups['all'].extend()
def inventory_to_json(inventory):
return json.dumps(inventory, sort_keys=True, indent=4, default=serialize)
def invnetory_to_init(inventory):
ini = configparser.ConfigParser()
def main():
instances = get_instances(filters=FILTERS)
inventory = build_inventory(instances)
print()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment