Skip to content

Instantly share code, notes, and snippets.

@bwhaley
Created January 3, 2018 01:27
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save bwhaley/2cf3737ec94ec22417cff41ccf5e2068 to your computer and use it in GitHub Desktop.
Save bwhaley/2cf3737ec94ec22417cff41ccf5e2068 to your computer and use it in GitHub Desktop.
AWS lambda function to subscribe new CloudWatch Log groups to another lambda function
# Lambda function to subscribe all new cloudwatch log groups to a log shipper function
# Used in conjunction with https://github.com/SumoLogic/sumologic-aws-lambda
import os
import logging
import json
import uuid
import boto3
from botocore.exceptions import ClientError
# set logging
log = logging.getLogger()
log.setLevel(logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.WARNING)
l = boto3.client("lambda")
logs = boto3.client("logs")
# Target function for subscriptions
LOG_SHIPPER_FUNCTION = os.environ['TARGET_FUNCTION']
# Common name for subscription filters
FILTER_NAME = "all"
def get_log_groups():
response = logs.describe_log_groups()
log_groups = response["logGroups"]
next_token = response.get("nextToken")
while next_token:
response = logs.describe_log_groups(nextToken=next_token)
log_groups.extend(response["logGroups"])
next_token = response.get("nextToken")
return log_groups
def get_policy():
try:
policy = l.get_policy(FunctionName=LOG_SHIPPER_FUNCTION).get("Policy")
return json.loads(policy)
except ClientError:
log.warn("No policy found for {}".format(LOG_SHIPPER_FUNCTION))
def remove_permission(sid):
l.remove_permission(FunctionName=LOG_SHIPPER_FUNCTION, StatementId=sid)
def get_shipper_arn():
response = l.get_function(FunctionName=LOG_SHIPPER_FUNCTION)
return response["Configuration"]["FunctionArn"]
def subscribe_log_group(lg_name, region, account_id):
log.info("Setting permissions")
response = l.add_permission(
FunctionName=LOG_SHIPPER_FUNCTION,
StatementId=str(uuid.uuid4()),
Action='lambda:InvokeFunction',
Principal="logs.{}.amazonaws.com".format(region),
SourceArn="arn:aws:logs:{}:{}:log-group:{}:*".format(
region,
account_id,
lg_name),
SourceAccount=account_id)
log.debug("Response: {}".format(response))
log.info("Subscribing log group")
logs.put_subscription_filter(
logGroupName=lg_name,
filterName=FILTER_NAME,
filterPattern='',
destinationArn=get_shipper_arn())
def remove_stale_subscriptions():
"""CloudWatch Log groups that were subscribed to the log shipper
but have been deleted but cannot be cleaned up in the UI. This
code searches for stale log groups and cleans up the permissions."""
log.info("Fetching function policy")
policy = get_policy()
if not policy:
return
log.info("Fetching log groups")
log_groups = get_log_groups()
log.info("Discovered {} log groups".format(len(log_groups)))
lg_arns = [lg["arn"] for lg in log_groups]
for statement in policy["Statement"]:
policy_arn = statement["Condition"]["ArnLike"]["AWS:SourceArn"]
if policy_arn not in lg_arns:
log.info("Identified missing log group. Removing the permission")
sid = statement["Sid"]
remove_permission(sid)
def lambda_handler(event, context):
log.info("Running log subscriber")
region = context.invoked_function_arn.split(':')[3]
account_id = context.invoked_function_arn.split(':')[4]
remove_stale_subscriptions()
# Commence subscription functionality
event_name = event['detail']['eventName']
if event_name == "CreateLogGroup":
log_group_name = event['detail']['requestParameters']['logGroupName']
subscribe_log_group(log_group_name, region, account_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment