Skip to content

Instantly share code, notes, and snippets.

@ZhukovAlexander ZhukovAlexander/pipe.py
Last active Sep 11, 2019

Embed
What would you like to do?
import os
import boto3
from bitbucket_pipes_toolkit import Pipe
# defines the schema for pipe variables
variables = {
'AWS_ACCESS_KEY_ID': {'type': 'string', 'required': True},
'AWS_SECRET_ACCESS_KEY': {'type': 'string', 'required': True},
'AWS_DEFAULT_REGION': {'type': 'string', 'required': True},
'DISTRIBUTION_ID': {'type': 'string', 'required': True},
'PATHS': {'type': 'string', 'required': False, 'nullable': True, 'default': '/*'},
'DEBUG': {'type': 'boolean', 'required': False, 'default': False}
}
# initialize the Pipe object. At this stage the validation of variables takes place
pipe = Pipe(schema=variables)
distribution_id = pipe.get_variable('DISTRIBUTION_ID')
paths = pipe.get_variable('PATHS').split()
client = boto3.client('cloudfront')
# log the INFO message
pipe.log_info('Senging a cloudfront invalidation request')
# create a unique caller id from BITBUCKET_REPO_FULL_NAME and BITBUCKET_BUILD_NUMBER
# see https://confluence.atlassian.com/bitbucket/variables-in-pipelines-794502608.html
# to get the list of all available environment variables in pipelines
caller = f"{os.getenv('BITBUCKET_REPO_FULL_NAME')}:{os.getenv('BITBUCKET_BUILD_NUMBER')}"
try:
invalidation = client.create_invalidation(DistributionId=distribution_id,
InvalidationBatch={
'Paths': {
'Quantity': 1,
'Items': paths
},
'CallerReference': caller})
# print a colorized success message (in green)
pipe.success('Successfully created a cloudfront invalidation')
except Exception as error:
# log the ERROR message (in red)
pipe.log_error('Error creating a cloudfront invalidation')
# print a colorized error message and call system exit
pipe.fail(f"Failed to create a cloudfront invalidation: {error}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.