Last active
August 29, 2015 14:25
-
-
Save duggan/fe1b77622a394325d2c4 to your computer and use it in GitHub Desktop.
Delete bucket objects and remove them from CloudFlare's cache, parallellised on prefixes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Depends on boto and pyflare (install with `pip install boto pyflare`). | |
usage: purge.py [-h] -b BUCKET [-p PREFIX] -z ZONE [-d] | |
Parallellized S3 object and CloudFlare cache purger. | |
optional arguments: | |
-h, --help show this help message and exit | |
-b BUCKET, --bucket BUCKET | |
Bucket to operate on. | |
-p PREFIX, --prefix PREFIX | |
Key prefix(es). | |
-z ZONE, --zone ZONE CloudFlare zone. | |
-d, --dryrun Reports what changes would occur, but does not perform | |
them. | |
Also, you'll need to set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, | |
CLOUDFLARE_USER, and CLOUDFLARE_KEY as environment variables. | |
""" | |
import os | |
import time | |
import signal | |
import argparse | |
from multiprocessing import Event | |
from multiprocessing import Process | |
import boto | |
from boto.s3.connection import S3Connection | |
from boto.s3.connection import OrdinaryCallingFormat | |
from pyflare import PyflareClient, APIError | |
# Event for signalling across processes. | |
shutdown = Event() | |
# Each process should output how much work its done | |
# every HEARTBEAT keys. | |
HEARTBEAT = 100 | |
# Triggers the shutdown event on receipt of a signal. | |
def signal_handler(x,y): | |
shutdown.set() | |
# Exit with an error. | |
def error_out(message): | |
print(message) | |
exit(1) | |
# Register some signals with our shutdown handler. | |
signal.signal(signal.SIGTERM, signal_handler) | |
signal.signal(signal.SIGQUIT, signal_handler) | |
signal.signal(signal.SIGINT, signal_handler) | |
# Create a command line parser for the variable options. | |
# We'll collect the credentials via environment variables. | |
parser = argparse.ArgumentParser(description='Parallellized S3 object and CloudFlare cache purger.', | |
epilog="Also, you'll need to set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, " | |
"CLOUDFLARE_USER, and CLOUDFLARE_KEY as environment variables.") | |
parser.add_argument('-b', '--bucket', required=True, | |
help='Bucket to operate on.') | |
parser.add_argument('-p', '--prefix', default='', required=False, | |
help='Key prefix(es).') | |
parser.add_argument('-r', '--delete', action='store_true', | |
help='Delete keys from bucket during purge.') | |
parser.add_argument('-z', '--zone', required=True, | |
help='CloudFlare zone.') | |
parser.add_argument('-d', '--dryrun', action='store_true', | |
help='Reports what changes would occur, but does not perform them.') | |
opts = parser.parse_args() | |
# S3 bucket, just the bucket name is fine | |
bucket = opts.bucket | |
# Split a comma separated list of prefixes into a list | |
prefixes = [p.strip() for p in opts.prefix.split(',')] | |
# Delete keys from bucket | |
delete_keys = opts.delete | |
zone = opts.zone | |
# Dry run can be useful for making sure the change you want to happen | |
# is what will happen. Suggest testing first though and verifying results. | |
dry_run = opts.dryrun | |
# Environment variables | |
access_key_id = os.getenv("AWS_ACCESS_KEY_ID") | |
if not access_key_id: | |
error_out("Missing AWS_ACCESS_KEY_ID environment variable.") | |
secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") | |
if not secret_access_key: | |
error_out("Missing AWS_SECRET_ACCESS_KEY environment variable.") | |
cloudflare_user = os.getenv("CLOUDFLARE_USER") | |
if not cloudflare_user: | |
error_out("Missing CLOUDFLARE_USER environment variable.") | |
cloudflare_key = os.getenv("CLOUDFLARE_KEY") | |
if not cloudflare_key: | |
error_out("Missing CLOUDFLARE_KEY environment variable.") | |
if len(opts.prefix): | |
print("Processing with prefixes: %s" % ",".join(prefixes)) | |
else: | |
print("WARNING: no prefixes were specified, so no parallellization.") | |
print("If you'd like to change that, hit Ctrl+C.") | |
timeout = 5 | |
while not shutdown.is_set(): | |
if timeout <= 0: | |
break | |
print(timeout) | |
time.sleep(1) | |
timeout -= 1 | |
if shutdown.is_set(): | |
print("Stopping.") | |
exit() | |
if dry_run: | |
print("Beginning a DRY RUN (no data will be modified)...") | |
def worker(prefix): | |
print("Starting worker for prefix '%s'" % prefix) | |
try: | |
c = S3Connection(access_key_id, secret_access_key, calling_format=OrdinaryCallingFormat()) | |
b = c.get_bucket(bucket) | |
except Exception as e: | |
print("Error (worker '%s'): %s" % (prefix, e)) | |
return | |
cf = PyflareClient(cloudflare_user, cloudflare_key) | |
schemas = ['http', 'https'] | |
# Start a counter so we can offer feedback | |
count = 0 | |
for k in b.list(prefix=prefix): | |
if shutdown.is_set(): | |
# Interrupt on a shutdown signal | |
return | |
if not dry_run: | |
# Deletes the key from the bucket | |
if delete_keys: | |
b.delete_key(k) | |
for schema in schemas: | |
url = "%s://%s/%s" % (schema, bucket, k.name) | |
print("Purging %s..." % url) | |
while True: | |
try: | |
cf.zone_file_purge(zone, url) | |
break | |
except APIError as e: | |
wait_for = 65 | |
print("Waiting %d seconds for CloudFlare rate limit" % wait_for) | |
time.sleep(wait_for) | |
else: | |
print("DRYRUN: deleting %s/%s" % (bucket, k.name)) | |
for schema in schemas: | |
url = "%s://%s/%s" % (schema, bucket, k.name) | |
print("Purging %s..." % url) | |
count += 1 | |
if count % HEARTBEAT == 0: | |
print("%d keys processed for %s..." % (count, prefix)) | |
print("Finished processing for %s (%d keys)" % (prefix, count)) | |
# Put all our jobs in a list so we can terminate them cleanly at the end of the run. | |
jobs = [] | |
for prefix in prefixes: | |
p = Process(target=worker, args=(prefix,)) | |
jobs.append(p) | |
p.start() | |
# Intermittently check for what processes have finished executing. | |
while not shutdown.is_set(): | |
alive = False | |
for job in jobs: | |
# Check with a one second timeout. | |
job.join(1) | |
if job.is_alive(): | |
alive = True | |
# If at least one process is alive, leep running. | |
if not alive: | |
shutdown.set() | |
print("Finished all jobs.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment