Skip to content

Instantly share code, notes, and snippets.

@kjenney
Created September 5, 2021 16:42
Show Gist options
  • Save kjenney/590af8d1d045c3aae2fbd2c3bbc4e248 to your computer and use it in GitHub Desktop.
Save kjenney/590af8d1d045c3aae2fbd2c3bbc4e248 to your computer and use it in GitHub Desktop.
Python Boto3 Create and Delete Resources
import argparse
from botocore.exceptions import ClientError
import boto3
from cryptography.fernet import Fernet
import json
import logging
from pprint import pprint
import uuid
'''
This script provisions the following resources:
1. An S3 Bucket to be used for keeping Pulumi state that one or more IAM users have access to read/write to
2. A KMS key for encrypting secrets in state that one or more IAM users have access to
3. An encryption key for encrypting secrets in Git
'''
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--aws-region', required=False)
parser.add_argument('-b', '--bucket-name', required=True)
parser.add_argument('-u', '--iam-users', nargs='+', help='A list of IAM user ARNs', required=False)
parser.add_argument('-i', '--iam-name', nargs='+', help='The name of the IAM Role and Policy', default="AccessPulumiStateBucket", required=False)
parser.add_argument('-r', '--replace', help='replace resources if they already exist',
action='store_true')
args = parser.parse_args()
class Bootstrap:
"""Create resources to use Pulumi securely on AWS
If aws_region is not specified, the bucket is created in the S3 default
region (us-east-1).
If iam_users is empty it will be assumed that the user that is being
used to authenticate has access to the S3 bucket already.
If the S3 bucket already exists there is an option to replace it. This is False by default.
param args Object ArgumentParser object
return: True if bucket created, else False
"""
def __init__(self, args):
self.bucket_name = args.bucket_name
self.aws_region = args.aws_region
self.iam_users = self.validate_iam_users(args.iam_users)
self.logger = logging.getLogger(__name__)
self.s3_resource = self.get_s3()
self.iam_client = boto3.client('iam')
self.kms_client = boto3.client('kms')
self.sts_client = boto3.client('sts')
if self.bucket_exists():
if args.replace:
bucket = self.replace_bucket()
print('S3 Bucket already exists. Replacing the bucket.')
else:
print('S3 Bucket already exists. Not replacing the bucket.')
else:
bucket = self.create_bucket()
print('S3 Bucket created.')
if self.iam_users:
assume_policy = self.create_iam_assume_role_policy()
if self.iam_role_exists(args.iam_name):
if args.replace:
self.replace_iam_role(assume_policy, args.iam_name)
print('IAM role already exists. Replacing the IAM role.')
else:
print('IAM role already exists. Not replacing the IAM role.')
else:
self.create_iam_role(assume_policy, args.iam_name)
if self.iam_policy_exists(args.iam_name):
if args.replace:
self.replace_iam_policy(bucket, args.iam_name)
print('IAM policy already exists. Replacing the IAM policy.')
else:
print('IAM policy already exists. Not replacing the IAM policy.')
else:
self.create_iam_policy(bucket, args.iam_name)
self.provision_kms_key()
self.provision_fernet_key()
def delete_bucket(self):
"""
Delete a bucket. The bucket must be empty or an error is raised.
Usage is shown in usage_demo at the end of this module.
:param bucket: The bucket to delete.
"""
bucket = self.s3_resource.Bucket(self.bucket_name)
try:
bucket.delete()
bucket.wait_until_not_exists()
self.logger.info("Bucket %s successfully deleted.", bucket.name)
except ClientError:
self.logger.exception("Couldn't delete bucket %s.", bucket.name)
raise
def replace_bucket(self):
"""Replace an S3 bucket"""
self.delete_bucket()
return self.create_bucket()
def replace_iam_role(self, assume_policy, role_name):
"""Replace an IAM Role that already exists"""
self.delete_iam_role(role_name)
return self.create_iam_role(assume_policy, role_name)
def replace_iam_policy(self, bucket, policy_name):
"""Replace an IAM Policy that already exists"""
self.delete_iam_policy(policy_name)
return self.create_iam_policy(bucket, policy_name)
def bucket_exists(self):
"""
Determine whether a bucket with the specified name exists.
:return: True when the bucket exists; otherwise, False.
"""
try:
self.s3_resource.meta.client.head_bucket(Bucket=self.bucket_name)
self.logger.info("Bucket %s exists.", self.bucket_name)
exists = True
except ClientError:
self.logger.warning("Bucket %s doesn't exist or you don't have access to it.",
self.bucket_name)
exists = False
return exists
def validate_iam_users(self, iam_users):
"""
Ensure that iam_users is a valid list
param iam_users: List IAM users arn's to enable access to bucket state
return: None if not all elements are valid arns, else List
"""
if isinstance(iam_users, list):
if any('arn' not in i for i in iam_users):
return None
return iam_users
def get_s3(self):
"""Get a Boto 3 S3 resource with a specific Region or with your default Region."""
s3_resource = boto3.resource('s3')
if not self.aws_region or s3_resource.meta.client.meta.region_name == self.aws_region:
return s3_resource
else:
return boto3.resource('s3', region_name=self.aws_region)
def iam_role_exists(self, role_name):
"""
Determine whether an IAM role with the specified name exists.
:return: True when the IAM role exists; otherwise, False.
"""
try:
self.iam_client.get_role(RoleName=role_name)
self.logger.info("Role %s exists.", role_name)
exists = True
except ClientError:
self.logger.warning("IAM Role %s doesn't exist or you don't have access to it.",
role_name)
exists = False
return exists
def get_iam_policy_arn(self, policy_name):
"""Return an IAM Policy's ARN"""
account_id = self.sts_client.get_caller_identity()['Account']
return f'arn:aws:iam::{account_id}:policy/{policy_name}'
def iam_policy_exists(self, policy_name):
"""
Determine whether an IAM policy with the specified name exists.
:return: True when the IAM policy exists; otherwise, False.
"""
try:
policy_arn = self.get_iam_policy_arn(policy_name)
self.iam_client.get_policy(PolicyArn=policy_arn)
self.logger.info("Policy %s exists.", policy_name)
exists = True
except ClientError:
self.logger.warning("IAM Policy %s doesn't exist or you don't have access to it.",
policy_name)
exists = False
return exists
def iam_detach_policy_from_role(self, iam_name):
"""Detach an IAM policy from an IAM role"""
policy_arn = self.get_iam_policy_arn(iam_name)
self.iam_client.detach_role_policy(
RoleName=iam_name,
PolicyArn=policy_arn
)
def delete_iam_role(self, role_name):
"""Delete an existing IAM role"""
self.iam_detach_policy_from_role(role_name)
self.iam_client.delete_role(
RoleName=role_name
)
def delete_iam_policy(self, policy_name):
"""Delete an existing IAM policy"""
policy_arn = self.get_iam_policy_arn(policy_name)
self.iam_client.delete_policy(
PolicyArn=policy_arn
)
def create_bucket(self):
"""
Create an Amazon S3 bucket with the specified name and in the specified Region.
:return: The newly created bucket.
"""
try:
if not self.aws_region or self.s3_resource.meta.client.meta.region_name == self.aws_region:
bucket = self.s3_resource.create_bucket(Bucket=self.bucket_name)
else:
bucket = self.s3_resource.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={
'LocationConstraint': self.aws_region
}
)
bucket.wait_until_exists()
self.logger.info("Created bucket '%s' in region=%s", bucket.name,
self.s3_resource.meta.client.meta.region_name)
except ClientError as error:
self.logger.exception("Couldn't create bucket named '%s' in region=%s.",
self.bucket_name, self.aws_region)
if error.response['Error']['Code'] == 'IllegalLocationConstraintException':
self.logger.error("When the session Region is anything other than us-east-1, "
"you must specify a LocationConstraint that matches the "
"session Region. The current session Region is %s and the "
"LocationConstraint Region is %s.",
self.s3_resource.meta.client.meta.region_name, self.aws_region)
raise error
else:
return bucket
def create_iam_policy(self, bucket, iam_name):
"""Create an IAM policy in the current account to be attached to an IAM role"""
policy_document = self.create_iam_role_policy_document(bucket)
policy = self.iam_client.create_policy(
PolicyName=iam_name,
PolicyDocument=policy_document
)
self.iam_client.attach_role_policy(
RoleName=iam_name,
PolicyArn=policy["Policy"]["Arn"]
)
def create_iam_role_policy_document(self, bucket):
"""
Create the IAM policy that is attachd to the IAM role granting
access to the state bucket
"""
dict = {}
dict["Version"] = "2012-10-17"
statements = []
statements.append({"Action":["s3:ListAllMyBuckets"],"Effect":"Allow","Resource":["arn:aws:s3:::*"]})
statements.append({"Action":["s3:ListBucket","s3:GetBucketLocation"],"Effect":"Allow","Resource":[f"arn:aws:s3:::{bucket.name}"]})
statements.append({"Action":["s3:GetObject","s3:PutObject"],"Effect":"Allow","Resource":[f"arn:aws:s3:::{bucket.name}/*"]})
dict["Statement"] = statements
return json.dumps(dict)
def create_iam_role(self, assume_policy, iam_name):
"""Create an IAM role in the current account to be assumed by IAM users"""
return self.iam_client.create_role(
RoleName = iam_name,
AssumeRolePolicyDocument = assume_policy
)
def create_iam_assume_role_policy(self):
"""
Create the AssumeRolePolicyDocument to be attached to the IAM role granting
access to the state bucket
"""
dict = {}
dict["Version"] = "2012-10-17"
statements = []
for i in self.iam_users:
statements.append({"Effect":"Allow","Principal":{"AWS":f"{i}"},"Action":"sts:AssumeRole"})
dict["Statement"] = statements
return json.dumps(dict)
def provision_kms_key(self):
print('Provisioning the KMS Key to secure Pulumi state')
def provision_fernet_key(self):
print('Provisioning the Fernet key for encrypting secrets in Git')
def verify_policy_attachment(self):
"""Verifying IAM Policy Attachment to IAM Role"""
def verify_iam_assume_role_policy_attachment(self):
"""Verifying IAM Assume Role Policy Attachment to IAM Role"""
bootstrap = Bootstrap(args)
#print(bootstrap.bucket_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment