Skip to content

Instantly share code, notes, and snippets.

@sktse
Last active October 13, 2020 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sktse/c07492813fc617aa316e1b0fd6204ce5 to your computer and use it in GitHub Desktop.
Save sktse/c07492813fc617aa316e1b0fd6204ce5 to your computer and use it in GitHub Desktop.
AWS Cognito - Pull all verified user emails and names

Description

  • This snippit of code pulls down all verified email users directly from Cognito.
    • It will get their email and first name
  • Note: The context and quality of this code is for one time use.
import json
import datetime
import boto3
# Amazon Cognito User Pool Configs
LIMIT = 60
REGION = 'us-east-1'
USER_POOL_ID = '<USER POOL ID>'
# Create boto3 CognitoIdentityProvider client
client = boto3.client(
service_name='cognito-idp',
region_name=REGION,
aws_access_key_id="<YOUR ACCESS ID>",
aws_secret_access_key="<YOUR ACCESS ID>",
)
pagination_token = ""
# Degfine function that utilize ListUsers AWS API call
def get_list_cognito_users(cognito_idp_client, next_pagination_token='', Limit=LIMIT):
if next_pagination_token:
return cognito_idp_client.list_users(
UserPoolId=USER_POOL_ID,
# AttributesToGet = ['name'],
Limit=Limit,
PaginationToken=next_pagination_token
)
return cognito_idp_client.list_users(
UserPoolId=USER_POOL_ID,
# AttributesToGet = ['name'],
Limit=Limit
)
user_records = []
# Pull Batch of Amazon Cognito User Pool records
while True:
user_records_page = get_list_cognito_users(
cognito_idp_client=client,
next_pagination_token=pagination_token,
Limit=LIMIT
)
user_records = user_records + user_records_page["Users"]
pagination_token = user_records_page.get("PaginationToken")
if not pagination_token:
break
# from pprint import pprint
# pprint(user_records)
records = []
for user_record in user_records:
if user_record["UserStatus"] != "CONFIRMED":
continue
attributes = user_record["Attributes"]
given_name = ""
email = ""
for attribute in attributes:
if attribute["Name"] == "given_name":
given_name = attribute["Value"]
elif attribute["Name"] == "email":
email = attribute["Value"]
record = {"email": email, "given_name": given_name}
records.append(record)
from pprint import pprint
pprint(records)
with open(f"./cognito-users-{datetime.datetime.now().strftime('%Y-%m-%d')}.csv", "w+") as f:
for user_record in records:
f.write(user_record["email"])
f.write(",")
f.write(user_record["given_name"])
f.write("\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment