Created
March 30, 2023 06:37
-
-
Save lkrimphove/613c039c5c0b11835da22189f4db4439 to your computer and use it in GitHub Desktop.
a Lambda function for monitoring your AWS resources
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import logging | |
from dataclasses import dataclass | |
from datetime import datetime | |
from typing import Any | |
import requests | |
import boto3 | |
logger = logging.getLogger() | |
if 'AWS_EXECUTION_ENV' in os.environ and 'LOG_LVL' in os.environ: | |
LOG_LVL = os.environ['LOG_LVL'] | |
logger.setLevel(level=LOG_LVL) | |
else: | |
# for local debugging | |
logger.setLevel(level='DEBUG') | |
if 'TAG_PREFIX' in os.environ: | |
TAG_PREFIX = os.environ['TAG_PREFIX'] + ':' | |
else: | |
TAG_PREFIX = '' | |
@dataclass | |
class BotoClients: | |
tagging: Any = boto3.client('resourcegroupstaggingapi') | |
clients = BotoClients() | |
def init_clients(): | |
global clients | |
if not clients: | |
clients = BotoClients() | |
def main(event, context): | |
# get all resources with restricted lifetime | |
resources = [] | |
tag_filters = [ | |
{ | |
'Key': f'{TAG_PREFIX}lifetime:restricted', | |
'Values': [ | |
'true', | |
] | |
}, | |
] | |
# do | |
response = clients.tagging.get_resources( | |
TagFilters=tag_filters | |
) | |
pagination_token = response['PaginationToken'] | |
resources = resources + response['ResourceTagMappingList'] | |
# while | |
while pagination_token != '': | |
response = clients.tagging.get_resources( | |
TagFilters=tag_filters | |
) | |
pagination_token = response['PaginationToken'] | |
resources = resources + response['ResourceTagMappingList'] | |
logging.debug('### resources with limited lifetime') | |
logging.debug(resources) | |
# filter critical resources | |
critical = [] | |
for resource in resources: | |
lifetime_end = datetime.now() | |
present = datetime.now() | |
for tag in resource['Tags']: | |
if tag['Key'] == 'power:lifetime:end': | |
lifetime_end = datetime.strptime(tag['Value'], '%m/%d/%Y') | |
if lifetime_end <= present: | |
critical.append(resource) | |
logging.debug('### resources that exceeded lifetime') | |
logging.debug(resources) | |
if len(resources) <= 0: | |
logger.debug('No resources that exceed lifetime') | |
return | |
# format resources | |
message_text = '' | |
for resource in critical: | |
message_text += resource_to_string(resource) + '\n' | |
# send to sns | |
url = os.getenv('WEBHOOK_URL') | |
message = { | |
'text': message_text | |
} | |
response = requests.post(url, json=message) | |
logger.debug('### teams response') | |
logger.debug(response.status_code) | |
logger.debug(response.json()) | |
def resource_to_string(resource): | |
result = resource['ResourceARN'] | |
for tag in resource['Tags']: | |
if tag['Key'] == f'{TAG_PREFIX}owner:name': | |
result = f"{result} - owned by {tag['Value']}" | |
if tag['Key'] == f'{TAG_PREFIX}lifetime:end': | |
result = f"{result} - lifetime ended on {tag['Value']}" | |
return result | |
def lambda_handler(event, context): | |
logger.debug('## ENVIRONMENT VARIABLES') | |
logger.debug(json.dumps(dict(**os.environ), indent=4)) | |
logger.debug('## EVENT') | |
logger.debug(json.dumps(event, indent=4)) | |
try: | |
logger.debug('## MAIN FUNCTION START') | |
init_clients() | |
main(event, context) | |
except Exception as e: | |
logger.error(f'Main function raised exception {type(e).__name__} because {e}.') | |
logger.exception(e) | |
raise e | |
logger.debug('## MAIN FUNCTION END') | |
return 'finished' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment