Skip to content

Instantly share code, notes, and snippets.

@shinsaka
Last active March 28, 2024 06:06
Show Gist options
  • Save shinsaka/be90ea4bbe052a5aaf99c71594caff59 to your computer and use it in GitHub Desktop.
Save shinsaka/be90ea4bbe052a5aaf99c71594caff59 to your computer and use it in GitHub Desktop.
Get all event messages of a group and stream from CloudWatch Logs AWS
import boto3
from datetime import datetime
def get_events(log_group_name, log_stream_name, region_name=None):
"""
Get all event messages of a group and stream from CloudWatch Logs AWS
"""
client = boto3.client('logs', region_name=region_name)
# first request
response = client.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
startFromHead=True)
yield from response['events']
# second and later
while True:
prev_token = response['nextForwardToken']
response = client.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
nextToken=prev_token)
# same token then break
if response['nextForwardToken'] == prev_token:
break
yield from response['events']
# usage
log_group = '/aws/lambda/hogeprocedure'
log_stream = '2018/10/10/[$LATEST]7673b1e6c7f233ae9dbebcfe494f5836'
for event in get_events(log_group, log_stream):
print('{} {}'.format(
datetime.fromtimestamp(int(event['timestamp']) / 1000),
event['message']))
@muzhig
Copy link

muzhig commented Sep 15, 2022

yield from would make it prettier ;-)

@shinsaka
Copy link
Author

Thank you for your comment!
I change code, reduced loop layer with yield from.
Do you mean like this? @muzhig

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment