Skip to content

Instantly share code, notes, and snippets.

@rantoniuk
Last active February 14, 2022 16:08
Show Gist options
  • Save rantoniuk/c3d0dae4e499d0c11f0c756f6c1da73c to your computer and use it in GitHub Desktop.
Save rantoniuk/c3d0dae4e499d0c11f0c756f6c1da73c to your computer and use it in GitHub Desktop.
AWS
### Cross-IAM Invalidate CloudFront from a central pipeline account
###
### new codepipelineActions.LambdaInvokeAction({
### runOrder: 3,
### actionName: 'InvalidateCloudFront',
### role: this.devOpsPipelineRole,
### lambda: Function.fromFunctionArn(this, "test-cloudfrontInvalidator", "arn:aws:lambda:eu-west-1::function:pipelineInvalidateCloudFront"),
### userParameters: {
### bucketName: bucket.bucketName,
### targetPipelineRole: this.testPipelineRole.roleArn
### }
from __future__ import print_function
import json
import time
import urllib
import boto3
import botocore
from boto3.session import Session
cloudfront_client = boto3.client('cloudfront')
codepipeline_client = boto3.client('codepipeline')
def handler(event, context):
'''
Creates a cloudfront invalidation for content added to an S3 bucket
'''
# Log the the received event locally.
# print("Received event: " + json.dumps(event, indent=2))
# Extract the Job ID
job_id = event['CodePipeline.job']['id']
# Extract the Job Data
job_data = event['CodePipeline.job']['data']
# Extract the params
params = get_user_params(job_data)
# Get the object from the event.
bucket = params['bucketName']
cloudfront_client = get_cloudfront_client(params['targetPipelineRole'])
cf_distro_id = get_cloudfront_distribution_id(
cloudfront_client, bucket
)
if cf_distro_id:
print("Creating invalidation from {} for Cloudfront distribution {}".format(
bucket, cf_distro_id))
try:
invalidation = cloudfront_client.create_invalidation(
DistributionId=cf_distro_id,
InvalidationBatch={
'Paths': {
'Quantity': 1,
'Items': ['/*']
},
'CallerReference': str(time.time())
})
put_job_success(
job_id,
"Submitted invalidation ID {}".format(
invalidation['Invalidation']['Id'])
)
except Exception as e:
print("Error processing event from bucket {}. Event {}".format(
bucket, json.dumps(event, indent=2)))
raise e
else:
print("Bucket {} does not appeaer to be an origin for a Cloudfront distribution".format(bucket))
put_job_failure(
job_id,
"Bucket does not to have an origin for a CF distribution"
)
return 'Success'
def get_cloudfront_client(role_arn):
# create an STS client object that represents a live connection to the STS service
sts_client = boto3.client('sts')
# Call the assume_role method of the STSConnection object and pass the role
# ARN and a role session name.
assumed_role_object = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName="TargetAccountPipelineRoleSession"
)
# From the response that contains the assumed role, get the temporary
# credentials that can be used to make subsequent API calls
credentials = assumed_role_object['Credentials']
# Get a session for the assumed role
session = Session(aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
# return a CodePipeline client with the assumed role
return session.client('cloudfront', config=botocore.client.Config(signature_version='s3v4'))
def put_job_success(job, message):
"""Notify CodePipeline of a successful job
Args:
job: The CodePipeline job ID
message: A message to be logged relating to the job status
Raises:
Exception: Any exception thrown by .put_job_success_result()
"""
print('Putting job success')
print(message)
codepipeline_client.put_job_success_result(jobId=job)
def put_job_failure(job, message):
"""Notify CodePipeline of a failed job
Args:
job: The CodePipeline job ID
message: A message to be logged relating to the job status
Raises:
Exception: Any exception thrown by .put_job_failure_result()
"""
print('Putting job failure')
print(message)
codepipeline_client.put_job_failure_result(
jobId=job, failureDetails={
'message': message,
'type': 'JobFailed'
}
)
def get_cloudfront_distribution_id(client, bucket):
cf_distro_id = None
# Create a reusable Paginator
paginator = client.get_paginator('list_distributions')
# Create a PageIterator from the Paginator
page_iterator = paginator.paginate()
print("Looking for origin: {}".format(bucket))
for page in page_iterator:
for distribution in page['DistributionList']['Items']:
for cf_origin in distribution['Origins']['Items']:
# print("Origin {}".format(cf_origin['DomainName']))
if bucket == cf_origin['DomainName']:
print("Found it!")
cf_distro_id = distribution['Id']
return cf_distro_id
def get_user_params(job_data):
"""Decodes the JSON user parameters and validates the required properties.
Args:
job_data: The job data structure containing the UserParameters string which should be a valid JSON structure
Returns:
The JSON parameters decoded as a dictionary.
Raises:
Exception: The JSON can't be decoded or a property is missing.
"""
try:
# Get the user parameters which contain the stack, artifact and file settings
user_parameters = job_data['actionConfiguration']['configuration']['UserParameters']
decoded_parameters = json.loads(user_parameters)
except Exception as e:
# We're expecting the user parameters to be encoded as JSON
# so we can pass multiple values. If the JSON can't be decoded
# then fail the job with a helpful message.
raise Exception('UserParameters could not be decoded from JSON')
return decoded_parameters
# This updates AWS Lambda's environment variable values since it is currently not possible to use
# it directly from SSM as reference to trigget deployment changes
import boto3
client = boto3.client('lambda', region_name='eu-west-1')
paginator = client.get_paginator('list_functions')
page_iterator = paginator.paginate()
for page in page_iterator:
for function in page['Functions']:
if "Environment" in function and "DT_TENANT" in function['Environment']['Variables']:
current_function = function['FunctionName']
print(current_function, end='')
current_env = client.get_function_configuration(
FunctionName=current_function)
new_env = {}
new_env['Variables'] = current_env['Environment']['Variables']
new_env['Variables']['DT_TENANT'] = ...
response = client.update_function_configuration(
FunctionName=current_function,
Environment=new_env
)
print(" changed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment