Last active
October 9, 2018 09:54
-
-
Save 1oglop1/9950b033dc655f675ebc11ac122ab815 to your computer and use it in GitHub Desktop.
Convert AWS IAM Credential Report csv to JSON for elasticsearch ingestion.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
AWS IAM credential report produces csv with bit unusual values which | |
complicated ES ingestion. | |
https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_getting-report.html | |
This simple script converts the 'null' values into correct form. | |
""" | |
import csv | |
import json | |
from dateutil.parser import parse | |
# Conversion table | |
# Name type null_value | |
# user str | |
# arn str | |
# user_creation_time ISO time | |
# password_enabled bool | |
# password_last_used ISO time N/A | |
# password_last_changed ISO time N/A | |
# password_next_rotation ISO time N/A | |
# mfa_active bool | |
# access_key_1_active bool | |
# access_key_1_last_rotated ISO time N/A | |
# access_key_1_last_used_date ISO time N/A | |
# access_key_1_last_used_region str N/A | |
# access_key_1_last_used_service str N/A | |
# access_key_2_active bool | |
# access_key_2_last_rotated ISO time N/A | |
# access_key_2_last_used_date ISO time N/A | |
# access_key_2_last_used_region str N/A | |
# access_key_2_last_used_service str N/A | |
# cert_1_active bool | |
# cert_1_last_rotated ISO time N/A | |
# cert_2_active bool | |
# cert_2_last_rotated ISO time N/A | |
def fix_time_na(value): | |
""" | |
Fix time values. | |
Parameters | |
---------- | |
value: str | |
ISO-8601 time or N/A | |
Returns | |
------- | |
Value or None | |
""" | |
try: | |
parse(value) | |
return value | |
except ValueError: | |
return None | |
def fix_str_na(value): | |
""" | |
Fix string values. | |
Parameters | |
---------- | |
value: str | |
String or N/A | |
Returns | |
------- | |
Value or None | |
""" | |
if value == 'N/A': | |
return None | |
return value | |
def fix_bool(value): | |
""" | |
Translate lowercase bool string into boolean. | |
Parameters | |
---------- | |
value: str | |
Lowercase boolean. | |
Returns | |
------- | |
bool or None | |
""" | |
try: | |
return isinstance(eval(value.title())) | |
except NameError: | |
return None | |
def fix_iam_record(record): | |
""" | |
Fix values in IAM record. | |
Parameters | |
---------- | |
record: dict | |
IAM record from csv | |
Returns | |
------- | |
dict | |
Dictionary which contains fixed values. | |
""" | |
fixes = {} | |
bools_to_fix = ( | |
"mfa_active", "access_key_1_active", "cert_1_active", "cert_2_active", "access_key_2_active", "password_enabled" | |
) | |
strings_to_fix = ( | |
"user", "arn", "access_key_1_last_used_region", "access_key_1_last_used_service", | |
"access_key_2_last_used_region", | |
"access_key_2_last_used_service" | |
) | |
times_to_fix = ( | |
"access_key_2_last_rotated", | |
"access_key_2_last_used_date", | |
"access_key_1_last_rotated", | |
"access_key_1_last_used_date", | |
"cert_1_last_rotated", | |
"cert_2_last_rotated", | |
"password_last_used", | |
"password_last_changed", | |
"password_next_rotation", | |
"user_creation_time", | |
) | |
for key in times_to_fix: | |
fixes[key] = fix_time_na(record[key]) | |
for key in strings_to_fix: | |
fixes[key] = fix_str_na(record[key]) | |
for key in bools_to_fix: | |
fixes[key] = fix_bool(record[key]) | |
return fixes | |
def main(): | |
csv_file = "iam-credential-report.csv" | |
with open(csv_file, 'r') as inf: | |
reader = csv.DictReader(inf.readlines()) | |
dd = tuple(reader) | |
print(dd) | |
for usr in dd: | |
fix = fix_iam_record(usr) | |
usr.update(fix) | |
print(dd) | |
print(json.dumps(dd)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment