Skip to content

Instantly share code, notes, and snippets.

@froop
Created March 31, 2023 03:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save froop/1a24a0d3953020e774f8d4b43dace2bf to your computer and use it in GitHub Desktop.
Save froop/1a24a0d3953020e774f8d4b43dace2bf to your computer and use it in GitHub Desktop.
[Python][AWS] FilterLogEvents of CloudWatch Logs
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import boto3
import time
import logging
# Constants
LOG_GROUP_NAME = '/aws/lambda/test-filter-log-events' # Log group name of CloudWatch Logs
NUM_CHARS_TO_PRINT = 10 # Number of characters to print from log message
DELAY_OF_START_MS = 5000 # Initial delay before starting the log fetching process
MAX_RETRY_SLEEP_SECS = 8 # Maximum sleep time in seconds between retries
# Logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logging.getLogger('boto3').setLevel(logging.WARNING)
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
# Function to print formatted log event
def print_event(event):
message = event['message']
truncated_message = message[:NUM_CHARS_TO_PRINT]
num_chars_truncated = len(message) - len(truncated_message)
if num_chars_truncated > 0:
truncated_message = "%s... %d more" % (truncated_message, num_chars_truncated)
print("%s %s" % (event['timestamp'], truncated_message))
# Function to process the log events from the response
def process_events(response, latest_event_info):
response_size = 0
for event in response['events']:
if latest_event_info['timestamp'] != event['timestamp']:
latest_event_info['timestamp'] = event['timestamp']
latest_event_info['events'] = set()
if event['eventId'] not in latest_event_info['events']:
print_event(event)
latest_event_info['events'].add(event['eventId'])
response_size += len(event['message'])
return latest_event_info, response_size
# Function to update the request parameters for the next request
def update_request_params(response, latest_event_info, response_size, request_params):
if 'nextToken' not in response:
# if 'nextToken' not in response or response_size == 0:
request_params['startTime'] = latest_event_info['timestamp']
if 'nextToken' in request_params:
del request_params['nextToken']
else:
request_params['nextToken'] = response['nextToken']
return request_params
# Function to handle retries in case of no log events
def handle_retry(response_size, retry_count):
if response_size > 0:
return 0
sleep_secs = min(2**retry_count, MAX_RETRY_SLEEP_SECS)
logging.debug('sleep: retry_count=%d, sleep_secs=%d' % (retry_count, sleep_secs))
time.sleep(sleep_secs)
return retry_count + 1
# Function to truncate nextToken for logging output
def truncate_token(nextToken):
if nextToken is None:
return ''
return "%s...%s" % (nextToken[:4], nextToken[-4:])
###############
# Main function
###############
client = boto3.client('logs')
request_params = {
'logGroupName': LOG_GROUP_NAME,
'startTime': int(time.time()) * 1000 - DELAY_OF_START_MS
}
latest_event_info = {
'timestamp': request_params['startTime'],
'events': set()
}
retry_count = 0
# Main loop to continuously fetch log events
while True:
# Request to CloudWatch Logs API
logging.info('request: startTime=%s nextToken=%s' %
(request_params.get('startTime'), truncate_token(request_params.get('nextToken'))))
response = client.filter_log_events(**request_params)
latest_event_info, response_size = process_events(response, latest_event_info)
logging.info('response: latest=%d, size=%d, nextToken=%s' %
(latest_event_info.get('timestamp'), response_size, truncate_token(response.get('nextToken'))))
# Prepare for the next request
request_params = update_request_params(response, latest_event_info, response_size, request_params)
retry_count = handle_retry(response_size, retry_count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment