Skip to content

Instantly share code, notes, and snippets.

@gyoza
Last active October 9, 2019 00:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gyoza/fc5a9b17cf3de48fa2b879f627b2ce2a to your computer and use it in GitHub Desktop.
Save gyoza/fc5a9b17cf3de48fa2b879f627b2ce2a to your computer and use it in GitHub Desktop.
paginate ec2 with ability to filter by tag name and value
#!/usr/bin/env python3
import os
import re
import sys
import time
import math
import json
import boto
import boto3
import random
import inspect
import argparse
import botocore
import traceback
import base64
from x256 import x256
from boto3 import client
from pprint import pformat
from os.path import getmtime
from datetime import datetime
from pygments import highlight
from random import randint, choice
from botocore.exceptions import ClientError
from pygments.lexers import JsonLexer, PythonLexer, TextLexer
from pygments.formatters import TerminalFormatter, Terminal256Formatter
def jprint(**kwargs):
""" easy json.dumps indent=4 printer cause im lazyyyy """
passed_name = list(kwargs.items())[0][0]
passed_data = kwargs[passed_name]
ptype = str(type(passed_data))
print_header = "+---- [{} - {}] --\n".format(passed_name, ptype)
print_footer = "+------------------"
print(print_header)
found = False
if isinstance(passed_data, dict):
pre_color = json.dumps(passed_data, indent=4, default=convert_or_pass)
print(highlight(pre_color, JsonLexer(), Terminal256Formatter()))
found = True
elif isinstance(passed_data, list):
pre_color = json.dumps(passed_data, indent=4, default=convert_or_pass)
print(highlight(pre_color, JsonLexer(), Terminal256Formatter()))
found = True
elif isinstance(passed_data, str):
print(highlight(passed_data, TextLexer(), Terminal256Formatter()))
found = True
elif isinstance(passed_data, int):
print(passed_data)
found = True
if inspect.getmembers(passed_data, inspect.isclass) and not found:
print(highlight(pformat(passed_data), PythonLexer(), Terminal256Formatter()))
print(str(passed_data))
print(print_footer)
_color_map = {
"hblue": "\x1b[34;1m",
"hgreen": "\x1b[32;1m",
"hcyan": "\x1b[36;1m",
"hmagenta": "\x1b[35;1m",
}
class PrettyLogger(object):
""" -
use PrettyLogger(log_level="warn", msg="H|sup yo colorcode|highlight colorcode|me colorcode|MULTIPLE TIMES!")
"""
_log_level_map = {
"error" : ["red", "\x1b[31m"],
"warn" : ["yellow", "\x1b[33m"],
"info" : ["green", "\x1b[32m"],
"verbose" : ["blue", "\x1b[34;1m"],
"other" : ["cyan", "\x1b[36m"],
"invalid" : ["red", "\x1b[36m"],
"creset" : ["creset", "\x1b[0m"],
"debug" : ["magenta", "\x1b[35m"],
}
_color_map = {
"red": "\x1b[31m",
"yellow": "\x1b[33m",
"green": "\x1b[32m",
"blue": "\x1b[34m",
"cyan": "\x1b[36m",
"magenta": "\x1b[35m",
"hred": "\x1b[31;1m",
"hyellow": "\x1b[33;1m",
"hblue": "\x1b[34;1m",
"hgreen": "\x1b[32;1m",
"hcyan": "\x1b[36;1m",
"hmagenta": "\x1b[35;1m",
"creset": "\x1b[0m",
}
def highlight(self, msg):
if msg.startswith("H|"):
reset_color = self._color_map["creset"]
msg = msg.partition("|")[2]
matches = re.findall(r'\((.*?)\)', msg)
for each in matches:
original_msg = f"({each})"
msg_split = each.split('|')
color_code = msg_split[0]
words = msg_split[1]
if color_code not in self._color_map:
highlight_color = self._color_map["yellow"]
else:
highlight_color = self._color_map[color_code]
words = f'{highlight_color}{words}{reset_color}'
msg = msg.replace(original_msg, words)
return msg
def fix_header(self):
return "[\033[92m{} - {}\x1b[0m]".format(datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), self.header)
def __init__(self, **kwargs):
log_level = kwargs["log_level"]
if log_level not in self._log_level_map:
log_level = "invalid"
msg = kwargs["msg"]
self.log_level_color = self._log_level_map[log_level][1]
self.log_level_color_reset = self._log_level_map["creset"][1]
self.log_level = ":{}{:>7s}{}:".format(self.log_level_color, log_level, self.log_level_color_reset)
called = str(list(sys._current_frames().items())[0][0])
self.header = [v[1] for k, v in enumerate(inspect.stack()) if called not in str(v[0])][1].split('/')[-1]
self.header = self.fix_header()
msg = self.highlight(msg)
print("{} {} {}".format(self.header, self.log_level, msg))
class super_except(Exception):
""" Super Exception Class """
def __init__(self, *args, **kwargs):
self.original_exception = kwargs.pop('original_exception', None)
if self.original_exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.traceback_text = '\n'.join(traceback.format_tb(exc_traceback))
Exception.__init__(self, *args)
@property
def name(self):
return self.__class__.__name__
def __repr__(self):
output = super(super_except, self).__repr__()
output += '\n'
if self.original_exception:
output += '------------------------------------------------------------\nORIGINAL EXCEPTION:\n'
output += '{}\n'.format(self.traceback_text)
output += repr(self.original_exception)
return output
def __str__(self):
return repr(self)
class SmartFormatter(argparse.HelpFormatter):
def _split_lines(self, text, width):
if text.startswith('R|'):
return text[2:].splitlines()
# this is the RawTextHelpFormatter._split_lines
return argparse.HelpFormatter._split_lines(self, text, width)
def convert_or_pass(obj):
""" Allows storing of crappy AWS datestamp """
if hasattr(obj, 'isoformat'):
return obj.isoformat()
else:
json.JSONEncoder.default(self, obj)
def chunkify(input, n):
return [input[i::n] for i in range(n)]
def config_parser():
""" argparse """
parser = argparse.ArgumentParser(description="hit ec2 for results", formatter_class=SmartFormatter, epilog="example: ec2list --tag environment --value staging --region us-east-1 --profile imeet2-sysops")
parser.add_argument("--profile", help="R|Desired AWS profile\n\nexample: default (default)\n someotheraccount", default="imeet2-sysops")
parser.add_argument("--region", help="R|Desired AWS region\n\nexample: us-east-1 (us-east-1)\n eu-central-1", default="us-east-1")
parser.add_argument("--all", required=False, action='store_true', help="R|Get everything in region specified.")
parser.add_argument("--inverse", required=False, action='store_true', help="R|Show everything not with this tag.")
parser.add_argument("--showtags", required=False, action='store_true', help="R|Show tags for system in output.")
parser.add_argument("--tag", required=False, help="R|Look for this Tag, (case sensitive)\n\nexample: Name or Environment")
parser.add_argument("--value", required=False, help="R|Look for this value in tag values, (case sensitive)\n\nexample: olympus, Production, or Staging")
parser.add_argument("--clear", required=False, action='store_true', help="R|clear cache")
return parser
class AWSdo(object):
""" READS MANIFEST PASSED FROM INPUT HANDLER:
"""
def __init__(self, **kwargs):
self.manifest = kwargs['manifest']
self.region = self.manifest["region"]
self.profile = self.manifest["profile"]
self.tag_name = self.manifest["tag_name"]
self.tag_value = self.manifest["tag_value"]
self.all = self.manifest["all"]
self.inverse = self.manifest["inverse"]
self._service_map = {
"ec2": {
"call": "describe_instances",
"filter" : "Reservations[].Instances[?Tags[]|[?starts_with(Key, '{}')]|[?contains(Value, '{}')]]".format(self.tag_name, self.tag_value),
"allfilter" : "Reservations[].Instances[?Tags[]|[?starts_with(Key, '{}')]|[?contains(Value, '{}')]]".format("", ""),
"inversefilter" : "Reservations[].Instances[?Tags[]|[?starts_with(Key, '{}')]|[?!not_null(Value, '{}')]]".format("", "")
},
}
self.service = self.manifest["service"]
self.service_call = self._service_map[self.service]["call"]
if self.all:
self.filter = self._service_map[self.service]["allfilter"]
elif self.inverse:
self.filter = self._service_map[self.service]["inversefilter"]
else:
self.filter = self._service_map[self.service]["filter"]
# print(self.filter)
bytes_filter = '{}{}'.format(self.profile, self.filter).encode()
bytes_plus = '{}{}'.format(bytes_filter, self.region).encode()
# filter_plus = "{}{}".format(bytes_filter, bytes_plus)
fn = base64.urlsafe_b64encode(bytes_plus)
fn = fn.decode()
fn = fn.replace("=", '').replace(".", '')
chunks, chunk_size = len(fn), len(fn) // 6
self.dir_path = "/".join([fn[i:i + chunk_size] for i in range(0, chunks, chunk_size)][:-1])
self.file_name = "{}.json".format([fn[i:i + chunk_size] for i in range(0, chunks, chunk_size)][-1])
self.dir_path = f"/tmp/ec2list/{self.dir_path}"
self.full_path = f"{self.dir_path}/{self.file_name}"
def touch(self):
with open(self.full_path, 'a'):
os.utime(self.full_path, None)
def check_cache(self):
# print(f"checking cache file {self.full_path}")
if not os.path.isfile(self.full_path):
# print("no cache found")
# os.makedirs(file_path, exist_ok=True)
# self.touch(full_path)
return False
elif os.path.isfile(self.full_path):
# check time
# print("file found checking age.")
modified_time = os.path.getmtime(self.full_path)
current_time = time.time()
time_since = current_time - modified_time
if time_since > 86400:
# print("file older than 60 minutes, getting new cache.")
os.remove(self.full_path)
return False
elif args.clear:
os.remove(self.full_path)
return False
else:
return True
def write_cache(self, json_data):
os.makedirs(self.dir_path, exist_ok=True)
self.touch()
f = open(self.full_path, 'w')
f.write(json.dumps(json_data, default=convert_or_pass))
f.close()
def get_paginate(self, session=None):
client = session.client(self.service)
paginator = client.get_paginator(self.service_call)
page_iterator = paginator.paginate(
PaginationConfig={'PageSize': 100}
)
results = page_iterator.search(self.filter)
return results
def paginate_aws(self, service=None, filter_req=None):
found_objects = []
try:
session = boto3.Session(region_name=self.region, profile_name=self.profile)
check_cache = self.check_cache()
if not check_cache:
collect = list(self.get_paginate(session=session))
self.write_cache(collect)
elif check_cache:
collect = json.loads(open(self.full_path).read())
for each_obj in collect:
found_objects.append(each_obj)
if len(found_objects) == 0:
PrettyLogger(log_level="error", msg="H|(hred|No service returned) - (yellow|Correct Criteria? Try using the another profile.)")
quit()
except Exception as e:
PrettyLogger(log_level="error", msg="H|(hred|Oh No! Something went wrong paginating aws!)", flip=True)
raise super_except(inspect.currentframe().f_code.co_name, original_exception=e)
return found_objects
def get_ec2_for_tags(self):
if self.inverse:
PrettyLogger(log_level="info", msg=f'H|getting all EC2 systems that match are (hred|MISSING) tag name: (hgreen|{self.tag_name}) value: (hblue|{self.tag_value}) region: (hred|{self.region})')
else:
PrettyLogger(log_level="info", msg=f'H|getting all EC2 systems that match tag name: (hgreen|{self.tag_name}) value: (hblue|{self.tag_value}) region: (hred|{self.region})')
results = self.paginate_aws()
return results
if __name__ == '__main__':
parser = config_parser()
args = parser.parse_args()
manifest = {
"region" : args.region if args.region else None,
"profile": args.profile if args.profile else None,
"all": args.all if args.all else None,
"inverse": args.inverse if args.inverse else None,
"tag_value": args.value if args.value else None,
"tag_name": args.tag if args.tag else None,
"showtags": args.showtags if args.showtags else None,
"service": "ec2"
}
# debug manifest options here
# print(manifest)
a = manifest.get("tag_value")
b = manifest.get("tag_name")
c = manifest.get("all")
if ((b and c) or (a and c)):
PrettyLogger(log_level="error", msg="H|(hred|Syntax Error): you must choose either --all, or --tag with --value or --all on its own.")
quit()
results = AWSdo(manifest=manifest).get_ec2_for_tags()
last_color = None
print("--------------------------------------------------------------------------------")
count = 0
for each_result in results:
if len(each_result) > 0:
count += 1
instance = each_result[0]
if instance.get("Tags"):
tags = {d['Key']: d['Value'] for d in instance.get("Tags")}
instanceid = instance.get("InstanceId")
ip = instance.get("PrivateIpAddress") if instance.get("PrivateIpAddress") else "None Found"
pub_ip = instance.get("PublicIpAddress") if instance.get("PublicIpAddress") else "None Found"
state = instance.get("State").get("Name") if instance.get("State").get("Name") else "None Found"
instance_type = instance.get("InstanceType") if instance.get("InstanceType") else "None Found"
launch_time = instance.get("LaunchTime") if instance.get("LaunchTime") else "None Found"
name = tags.get("Name", "NoName")
name = name[:33] + (name[33:] and '..')
ix = x256.from_rgb(randint(randint(36, 175), 255), randint(randint(36, 175), 255), randint(randint(36, 175), 255))
rand_color = f"\x1b[38;5;{ix}m"
creset = "\x1b[0m"
if rand_color == last_color:
ix = x256.from_rgb(randint(randint(36, 175), 255), randint(randint(36, 175), 255), randint(randint(36, 175), 255))
rand_color = f"\x1b[38;5;{ix}m"
last_color = rand_color
print(f'{rand_color}{count:4} {args.region:15} - {name:35} - {instance_type:14} {instanceid:20} - {ip:15}/ {pub_ip:15} {state:10} - {launch_time:20}{creset}')
col_1 = []
col_2 = []
if manifest.get("showtags"):
for i, (k, v) in enumerate(tags.items()):
k = (k[:17] + '..') if len(k) > 17 else k
v = (v[:20] + '..') if len(v) > 20 else v
string = f'{k:20} : {v:30}'
if i % 2 == 0:
col_1.append(string)
else:
col_2.append(string)
for col_1, col_2 in zip(col_1, col_2):
print(f'\t{col_1:30}\t\t {col_2:30}')
print("--------------------------------------------------------------------------------")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment