Skip to content

Instantly share code, notes, and snippets.

@kennovation1
Last active August 23, 2021 16:07
Show Gist options
  • Save kennovation1/343207088d48d3e47cbee32cc5585c13 to your computer and use it in GitHub Desktop.
Save kennovation1/343207088d48d3e47cbee32cc5585c13 to your computer and use it in GitHub Desktop.
Continuously monitor and pretty print Amazon EventBridge traffic as captured in CloudWatch Logs
# busmon.py
# Monitor CloudWatch Logs for EventBridge traffic.
#
# This script assumes that there is a Lambda function snooping an EventBridge bus
# and logging the events to a CloudWatch Logs group.
#
# Your format will likely be different and will require slight adjustments to the parsing
# logic below. The default format assumed for this version is:
# timestamp and other stuff|field|field|field|SNOOPED detail-type: <eventName> jsonString
# That is 5 pipe-delimited fields, where the last field is the only field used.
# The 5th field uses a static preamble of "SNOPPED detail-type: " followed by the event name and
# a JSON string with the event details.
#
# To use this script, read through and change as needed for your use case. Most areas that need
# to be reviewed are annotated with '#NOTE Review'
#
# Requires:
# Python 3.7+ (i.e., only tested on 3.7.4)
# AWS CLI v2 installed on system
# or
# awslogs (pip install awslogs)
#
# Usage:
# python busmon.py [dev|prod]
#
# Author: Ken Robbins - CloudPegboard.com
import sys
import json
from subprocess import Popen, PIPE, STDOUT
env = 'dev'
if len(sys.argv) == 2:
env = sys.argv[1]
class style:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
ENDC = '\033[0m'
# Sample event
# NOTE Review
sampleEvent = '''2020-08-12T14:15:41 2020-08-12 14:15:41,852|INFO|go2:snoop|36|SNOOPED detail-type: app_home_opened {"version": "0", "id": "1e62edf5-f735-81e5-37f0-7cf5407d4d4c", "detail-type": "app_home_opened", "source": "slackbot-events-dev", "account": "123456789012", "time": "2020-08-12T14:15:40Z", "region": "us-east-1", "resources": [], "detail": {"userId": "MYUSERID", "teamId": "ATEAMID", "apiAppId": "ANAPPID", "channelId": "ACHANNELID", "tab": "messages"}}
'''
# NOTE Review 3 lines
env='prod'
logGroup = f'/aws/lambda/slackbot-{env}-snoopHandler'
since = '1h'
filterPattern = 'SNOOPED'
# AWS CLI v2
# TODO This does not yet work due to the --follow not playing nicely with pipes
# Empirically there is about a 14-second latency from event to display
logsCmdAwsCli = [
'/usr/local/bin/aws',
'logs',
'tail',
logGroup,
'--format',
'short',
'--since',
since,
'--no-paginate',
'--filter-pattern=' + filterPattern,
'--follow'
]
# awslogs (pip install awslogs)
# Empirically there is about a 12-second latency from event to display
logsCmdAwsLogs = [
'awslogs',
'get',
logGroup,
'ALL',
'-s=' + since,
'--filter-pattern=' + filterPattern,
'--watch'
]
def prettyPrintEvent(line):
# NOTE Review. Parse based on your log format
msg = line.rstrip().split('|', 4)[4]
parts = msg.split(' ', 3)
detailType = parts[2]
ev = json.loads(parts[3])
# Format and print for good developer UX (only include attributes you really need)
# NOTE Review from here to end of function
out = {
'source': ev['source'],
'time': ev['time'],
'detail': ev['detail']
}
eventTypeColor = style.OKBLUE
print(f"\n{style.BOLD}{eventTypeColor}{ev['detail-type']}{style.ENDC}")
print(json.dumps(out, indent=4))
########
# MAIN #
########
# Test formatting. Comment out for live use.
# prettyPrintEvent(sampleEvent)
print(f'Starting to monitor log group: {logGroup}')
# cmd = logsCmdAwsCli # Do not use for now since "aws logs" does not work properly yet
cmd = logsCmdAwsLogs
proc = Popen(cmd, stdout=PIPE, shell=False, encoding='utf8')
while True:
line = proc.stdout.readline()
if len(line) == 0 and proc.poll() is not None:
break
prettyPrintEvent(line)
# snoop.py
# Watch all events and log, ignore, or do something else
# to help with debugging as specified by env var.
#
# Make sure to set up EventBridge rules to set this function as a target
# for all events that you might want to monitor.
#
# Runtime: Python 3.7
#
# Environment:
# Specify to snoop all events or a pipe-delimited list of event names.
# export SNOOP_EVENTS=ALL
# export SNOOP_EVENTS=NONE
# export SNOOP_EVENTS='eventName1|eventName2|...'
#
# Here's an example Serverless Framework serverless.yml fragment for deploying this function:
# snoopHandler:
# handler: snoop.snoopHandler
# description: Catch all events and dump to logs or other debug actions
# memorySize: 128
# timeout: 3
# environment:
# SNOOP_EVENTS: 'ALL'
# events:
# - eventBridge:
# pattern:
# source:
# - 'slackbot-source1-${self:provider.stage}'
# - 'slackbot-source2-${self:provider.stage}'
# - 'slackbot-source3-${self:provider.stage}'
# - 'slackbot-source4-${self:provider.stage}'
#
# Author: Ken Robbins - CloudPegboard.com
import os
import json
import logging
import unittest
from logutils import setupLogging
def snoopHandler(event, context):
snoopEvents = os.environ.get('SNOOP_EVENTS', 'ALL')
if snoopEvents == 'NONE':
return 'Ignored event'
# You do you. setupLogging is a local utility that configures the logger
# the way CloudPegboard.com does logging. Your's will likely be differed.
# Here's format string that setupLogging happens to use:
# '%(asctime)s|%(levelname)s|go2:%(module)s|%(lineno)d|%(message)s'
logger = setupLogging(logging.INFO)
detailType = event['detail-type']
detailTypes = snoopEvents.split('|')
if (snoopEvents == 'ALL') or (detailType in detailTypes):
logger.info(f'SNOOPED detail-type: {detailType} ' + json.dumps(event))
return f'Logged event: {detailType}'
else:
return f'Skipped event: {detailType}'
#############
# unittests #
#############
class TestController(unittest.TestCase):
def setUp(self):
self.event = {
"version": "0",
"id": "someLongIdString",
"detail-type": "app_home_opened",
"source": "slackbot-events-dev",
"account": "123456789012",
"time": "2020-07-31T18:27:14Z",
"region": "us-east-1",
"resources": [],
"detail": {
"userId": "someUserId",
"teamId": "someTeamId",
"apiAppId": "someAppId",
"channelId": "someChannelId",
"tab": "messages"
}
}
self.eventType = self.event['detail-type']
self.logSample = '2020-08-17 17:31:57,750|INFO|go2:snoopblog|36|SNOOPED detail-type: app_home_opened {"version": "0", "id": "someLongIdString", "detail-type": "app_home_opened", "source": "slackbot-events-dev", "account": "123456789012", "time": "2020-07-31T18:27:14Z", "region": "us-east-1", "resources": [], "detail": {"userId": "someUserId", "teamId": "someTeamId", "apiAppId": "someAppId", "channelId": "someChannelId", "tab": "messages"}}'
def test_snoopALL(self):
os.environ['SNOOP_EVENTS'] = 'ALL'
response = snoopHandler(self.event, None)
self.assertEqual(response, f'Logged event: {self.eventType}')
print('\nLog sample should look like this:')
print(self.logSample + '\n')
def test_snoopSpecific(self):
os.environ['SNOOP_EVENTS'] = f'{self.eventType}|someOtherEvent'
response = snoopHandler(self.event, None)
self.assertEqual(response, f'Logged event: {self.eventType}')
def test_snoopNoMatch(self):
os.environ['SNOOP_EVENTS'] = 'someEvent|someOtherEvent'
response = snoopHandler(self.event, None)
self.assertEqual(response, f'Skipped event: {self.eventType}')
def test_snoopNONE(self):
os.environ['SNOOP_EVENTS'] = 'NONE'
response = snoopHandler(self.event, None)
self.assertEqual(response, 'Ignored event')
########
# MAIN #
########
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment