Skip to content

Instantly share code, notes, and snippets.

@jpbarto
Last active January 12, 2024 13:13
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jpbarto/c484c923c365b3e391b8eb5029cbaebc to your computer and use it in GitHub Desktop.
Save jpbarto/c484c923c365b3e391b8eb5029cbaebc to your computer and use it in GitHub Desktop.
Simple script to read users in a Cognito user pool, check them for failed logins, and put those failed logins to CloudWatch logs
#!/usr/bin/env python3
"""
The following script demonstrates how to use the AWS Boto3 SDK to iterate through
all of the users in an AWS Cognito User Pool and examine the events associated
with each user.
If any failed authentication events are found the script formats them as messages
and logs them to CloudWatch logs.
This script could easily be modified to run periodically as a Lambda function
triggered by a scheduled CloudWatch rule. If transforming this script to a
Lambda function I would recommend using the os.environ to retrieve parameters for
the Lambda function such as AWS Region and Cognito User Pool identity.
"""
import boto3
# set the region to operate in
region = boto3.session.Session().region_name
# create clients for Cognito Identity Provider (User pools) and CloudWatch logs
idp = boto3.client ('cognito-idp', region_name = region)
logs = boto3.client ('logs', region_name = region)
# define the user pool this script will work with
user_pool_id = '<your user pool here>'
def get_users ():
"""
Retreive a list of users from the Cognito user pool.
Returns a list of dictionaries in the form of:
[
{'username': 'user001', 'email': 'user001@example.com'},
...
{'username': 'scott', 'email': 'tiger@example.com'}
]
"""
usernames = list ()
# NOTE if your user pool has a lot of users in it you will need to look for
# a pagination token in the response. Subsequent requests to the user pool
# should reference the pagination token to get the next set of users.
#
# For more detail see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.list_users
users_resp = idp.list_users (
UserPoolId = user_pool_id,
AttributesToGet = ['email'])
# iterate over the returned users and extract username and email
for user in users_resp['Users']:
user_record = {'username': user['Username'], 'email': None}
for attr in user['Attributes']:
if attr['Name'] == 'email':
user_record['email'] = attr['Value']
usernames.append (user_record)
return usernames
def get_auth_events (username):
"""
For a given username retreive the most recent authentication events up to
a maximum of RESULT_LIMIT events. This is an arbitrary value set by the author.
Returns a list of dictionaries of the form
[
{'event_type': 'SignIn',
'timestamp': '2018-09-24T23:58:04Z',
'unix_timestamp': 1537833484000,
'result': 'Fail',
'ip_address': '192.158.68.23',
'location_city': 'London',
'location_country': 'United Kingdom'},
...
]
"""
RESULT_LIMIT = 5
auth_events = list ()
# List user auth events may also return a 'next' token if more events
# were requested than can be retrieved in a single call. You may want
# to keep an eye out for a token in the response to know that there are
# more events waiting for retrieval.
# For more see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.admin_list_user_auth_events
events_resp = idp.admin_list_user_auth_events (
UserPoolId = user_pool_id,
Username = username,
MaxResults = RESULT_LIMIT)
for event in events_resp['AuthEvents']:
event_record = {
'event_type': event['EventType'],
'timestamp': str(event['CreationDate']),
'unix_timestamp': int(event['CreationDate'].timestamp ()*1000),
'result': event['EventResponse'],
'ip_address': event['EventContextData']['IpAddress'],
'location_city': event['EventContextData']['City'],
'location_country': event['EventContextData']['Country']
}
auth_events.append (event_record)
return auth_events
def prepare_log_stream (group_name, stream_name):
"""
Create a CloudWatch log group and log stream if they don't already exist.
group_name (string) name of the log group to be created
stream_name (string) name of the log stream to be created in the group
Returns None
"""
group_exists = False
stream_exists = False
# call describe to determine if the log group already exists
resp = logs.describe_log_groups (logGroupNamePrefix = group_name)
for group in resp['logGroups']:
if group['logGroupName'] == group_name:
group_exists = True
# if the group wasn't found assume it doesn't exist and create it
if not group_exists:
logs.create_log_group (logGroupName = GROUP_NAME)
# call describe to determine if the log stream already exists
resp = logs.describe_log_streams (logGroupName = group_name, logStreamNamePrefix = stream_name)
for stream in resp['logStreams']:
if stream['logStreamName'] == stream_name:
stream_exists = True
# if the stream wasn't found, create it
if not stream_exists:
logs.create_log_stream (logGroupName = GROUP_NAME, logStreamName = STREAM_NAME)
# iterate over all the users in the user pool, retreive authentication events for every user
# and log failed login attempts to CloudWatch logs
for user in get_users ():
# arbitration group and stream name for CloudWatch log messages
GROUP_NAME = '/myorg/myapp'
STREAM_NAME = 'failed_logins'
log_entries = list ()
# ensure that the log group and stream exist before proceeding
prepare_log_stream (GROUP_NAME, STREAM_NAME)
# iterate over all the events for a user and look for failed logins
for event in get_auth_events (user['username']):
if event['result'] == 'Fail':
log_message = "WARN: User {} ({}) failed to login at {} from {} in {}, {}".format (
user['username'],
user['email'],
event['timestamp'],
event['ip_address'],
event['location_city'],
event['location_country'])
# CloudWatch log entries need to have a timestamp (in millis since 1970) and a message
# we defined the message above and are using the timestamp retreived from Cognito
log_entries.append ({
'timestamp': event['unix_timestamp'],
'message': log_message
})
# CloudWatch logs requires the log entries to be ordered by timestamp
def on_time (d):
return d['timestamp']
log_entries.sort (key=on_time, reverse=False)
# use the put log events CloudWatch API to record the messages to CW logs
# for more info see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.put_log_events
logs.put_log_events (
logGroupName = GROUP_NAME,
logStreamName = STREAM_NAME,
logEvents = log_entries
)
@ToddBradley
Copy link

Note that in order for this to work, the User Pool must have AdvancedSecurityMode enabled. Otherwise, you cannot use the AdminListUserAuthEvents command on line 89.

@sergiors
Copy link

sergiors commented Jul 5, 2022

To paginate the users you can use it

def list_users(**kwargs):
    try:
        result = cognito_idp.list_users(Limit=60, **kwargs)
    except Exception:
        return []
    else:
        for user in result.get('Users'):
            attrs = {attr['Name']: attr['Value'] for attr in user['Attributes']}
            yield dict(attrs, status=user['UserStatus'])
        if token := result.get('PaginationToken'):
            yield from list_users(PaginationToken=token, **kwargs)

>>> for user in list_users(UserPoolId=...):
>>>    print(user)

@waterimp
Copy link

waterimp commented Sep 12, 2022

@sergiors - Thank you for the pagination code. I had to make two fixes and then your code paginated properly for me.

  1. the last line of code is missing user_pool_id so that the function can recurse properly.
  2. I got rid of the the walrus (:=) operator because I am not using Python 3.8 yet. :)
def list_users(user_pool_id, **kwargs):
    result = cognito_idp.list_users(UserPoolId=user_pool_id, Limit=60, **kwargs)
    for user in result.get('Users'):
        attrs = {attr['Name']: attr['Value'] for attr in user['Attributes']}
        yield dict(attrs, status=user['UserStatus'])
    token = result.get('PaginationToken')
    if token:
        yield from list_users(user_pool_id, PaginationToken=token)

@sergiors
Copy link

sergiors commented Sep 13, 2022

@waterimp i recommend pass the param UserPoolId using kwargs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment