Skip to content

Instantly share code, notes, and snippets.

@duggan
Last active Aug 29, 2015
Embed
What would you like to do?
Delete bucket objects and remove them from CloudFlare's cache, parallellised on prefixes.
#!/usr/bin/env python
"""
Depends on boto and pyflare (install with `pip install boto pyflare`).
usage: purge.py [-h] -b BUCKET [-p PREFIX] -z ZONE [-d]
Parallellized S3 object and CloudFlare cache purger.
optional arguments:
-h, --help show this help message and exit
-b BUCKET, --bucket BUCKET
Bucket to operate on.
-p PREFIX, --prefix PREFIX
Key prefix(es).
-z ZONE, --zone ZONE CloudFlare zone.
-d, --dryrun Reports what changes would occur, but does not perform
them.
Also, you'll need to set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
CLOUDFLARE_USER, and CLOUDFLARE_KEY as environment variables.
"""
import os
import time
import signal
import argparse
from multiprocessing import Event
from multiprocessing import Process
import boto
from boto.s3.connection import S3Connection
from boto.s3.connection import OrdinaryCallingFormat
from pyflare import PyflareClient, APIError
# Event for signalling across processes.
shutdown = Event()
# Each process should output how much work its done
# every HEARTBEAT keys.
HEARTBEAT = 100
# Triggers the shutdown event on receipt of a signal.
def signal_handler(x,y):
shutdown.set()
# Exit with an error.
def error_out(message):
print(message)
exit(1)
# Register some signals with our shutdown handler.
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGQUIT, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Create a command line parser for the variable options.
# We'll collect the credentials via environment variables.
parser = argparse.ArgumentParser(description='Parallellized S3 object and CloudFlare cache purger.',
epilog="Also, you'll need to set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, "
"CLOUDFLARE_USER, and CLOUDFLARE_KEY as environment variables.")
parser.add_argument('-b', '--bucket', required=True,
help='Bucket to operate on.')
parser.add_argument('-p', '--prefix', default='', required=False,
help='Key prefix(es).')
parser.add_argument('-r', '--delete', action='store_true',
help='Delete keys from bucket during purge.')
parser.add_argument('-z', '--zone', required=True,
help='CloudFlare zone.')
parser.add_argument('-d', '--dryrun', action='store_true',
help='Reports what changes would occur, but does not perform them.')
opts = parser.parse_args()
# S3 bucket, just the bucket name is fine
bucket = opts.bucket
# Split a comma separated list of prefixes into a list
prefixes = [p.strip() for p in opts.prefix.split(',')]
# Delete keys from bucket
delete_keys = opts.delete
zone = opts.zone
# Dry run can be useful for making sure the change you want to happen
# is what will happen. Suggest testing first though and verifying results.
dry_run = opts.dryrun
# Environment variables
access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
if not access_key_id:
error_out("Missing AWS_ACCESS_KEY_ID environment variable.")
secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
if not secret_access_key:
error_out("Missing AWS_SECRET_ACCESS_KEY environment variable.")
cloudflare_user = os.getenv("CLOUDFLARE_USER")
if not cloudflare_user:
error_out("Missing CLOUDFLARE_USER environment variable.")
cloudflare_key = os.getenv("CLOUDFLARE_KEY")
if not cloudflare_key:
error_out("Missing CLOUDFLARE_KEY environment variable.")
if len(opts.prefix):
print("Processing with prefixes: %s" % ",".join(prefixes))
else:
print("WARNING: no prefixes were specified, so no parallellization.")
print("If you'd like to change that, hit Ctrl+C.")
timeout = 5
while not shutdown.is_set():
if timeout <= 0:
break
print(timeout)
time.sleep(1)
timeout -= 1
if shutdown.is_set():
print("Stopping.")
exit()
if dry_run:
print("Beginning a DRY RUN (no data will be modified)...")
def worker(prefix):
print("Starting worker for prefix '%s'" % prefix)
try:
c = S3Connection(access_key_id, secret_access_key, calling_format=OrdinaryCallingFormat())
b = c.get_bucket(bucket)
except Exception as e:
print("Error (worker '%s'): %s" % (prefix, e))
return
cf = PyflareClient(cloudflare_user, cloudflare_key)
schemas = ['http', 'https']
# Start a counter so we can offer feedback
count = 0
for k in b.list(prefix=prefix):
if shutdown.is_set():
# Interrupt on a shutdown signal
return
if not dry_run:
# Deletes the key from the bucket
if delete_keys:
b.delete_key(k)
for schema in schemas:
url = "%s://%s/%s" % (schema, bucket, k.name)
print("Purging %s..." % url)
while True:
try:
cf.zone_file_purge(zone, url)
break
except APIError as e:
wait_for = 65
print("Waiting %d seconds for CloudFlare rate limit" % wait_for)
time.sleep(wait_for)
else:
print("DRYRUN: deleting %s/%s" % (bucket, k.name))
for schema in schemas:
url = "%s://%s/%s" % (schema, bucket, k.name)
print("Purging %s..." % url)
count += 1
if count % HEARTBEAT == 0:
print("%d keys processed for %s..." % (count, prefix))
print("Finished processing for %s (%d keys)" % (prefix, count))
# Put all our jobs in a list so we can terminate them cleanly at the end of the run.
jobs = []
for prefix in prefixes:
p = Process(target=worker, args=(prefix,))
jobs.append(p)
p.start()
# Intermittently check for what processes have finished executing.
while not shutdown.is_set():
alive = False
for job in jobs:
# Check with a one second timeout.
job.join(1)
if job.is_alive():
alive = True
# If at least one process is alive, leep running.
if not alive:
shutdown.set()
print("Finished all jobs.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment