Skip to content

Instantly share code, notes, and snippets.

@csereno
Last active September 18, 2019 18:02
Show Gist options
  • Save csereno/cadcdf336eaaab4b304f12077dc60a48 to your computer and use it in GitHub Desktop.
Save csereno/cadcdf336eaaab4b304f12077dc60a48 to your computer and use it in GitHub Desktop.
Clear CloudFlare Cache with Lambda
###############
# Purges all cache in CloudFlare from AWS Lambda
# I use this script when GitHub pushes new content to my S3 bucket
# Requirements:
# * CloudFlare API Global Key saved in the Parameter Store as 'CloudFlare'
# * Environment variables:
# ** 'email' as your CloudFlare email to use
# ** 'cf_api' as your CloudFlare API URL
# ** 'topic' as your SNS topic to send notifications
# CloudFlare Doc: https://api.cloudflare.com/#zone-purge-all-files
#################
import json, os, boto3
import traceback
import logging
from botocore.vendored import requests
from boto3.session import Session
# Retrieve the CloudFlare API key from Parameter Store in Systems Manager
Params_Client = boto3.client('ssm')
CLOUDFLARE = Params_Client.get_parameter(Name='CloudFlare',WithDecryption=True)['Parameter']['Value']
#Setup SNS Vars
SNS_Client = boto3.client('sns')
SNSTopic = os.environ['topic']
# Main Function
def lambda_handler(event, context):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.debug(event)
codepipeline = boto3.client('codepipeline')
job_id = event['CodePipeline.job']['id']
try:
logger.info('Clearing CloudFlare cache...')
url = os.environ['cf_api']
headers = {"X-Auth-Email":EMAIL,"X-Auth-Key":CLOUDFLARE,"Content-Type":"application/json"}
data = '{"purge_everything":true}'
r = requests.post(url,headers=headers,data=data)
result = r.json()
print (result)
sendtext("Cache Cleared")
response = codepipeline.put_job_success_result(jobId=job_id)
logger.debug(response)
return result
except Exception as error:
logger.exception(error)
error = traceback.format_exc()
print (error)
sendtext("Cache Clear Failed:" + error)
response = codepipeline.put_job_failure_result(jobId=job_id, failureDetails={'type': 'JobFailed', 'message': str(error)})
logger.debug(response)
return error
# Send Text
def sendtext(message):
SNS_Client.publish(TopicArn=SNSTopic,Subject="CloudFlare Cache",Message=message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment