Skip to content

Instantly share code, notes, and snippets.

@sunahsuh
Created December 21, 2018 15:33
Show Gist options
  • Save sunahsuh/ab87a26ebf5062b66590f5ae029c0408 to your computer and use it in GitHub Desktop.
Save sunahsuh/ab87a26ebf5062b66590f5ae029c0408 to your computer and use it in GitHub Desktop.
import boto3
import click
import re
import threading
from time import sleep
from queue import Queue
from itertools import ifilter
class DateMatchWorker(threading.Thread):
def __init__(self, client, target_bucket, prefix_queue, match_queue, search_date, date_regexes, skip_prefixes, *args, **kwargs):
super(DateMatchWorker, self).__init__(*args, **kwargs)
self.client = client
self.target_bucket = target_bucket
self.prefix_queue = prefix_queue
self.match_queue = match_queue
self.date_regexes = date_regexes
self.search_date = search_date
self.skip_prefixes = skip_prefixes
def run(self):
while True:
prefix = self.prefix_queue.get()
if prefix is None:
break
class SkipPrefix(Exception):
pass
try:
for p in self.skip_prefixes:
if p in prefix:
print("found {}".format(p))
raise SkipPrefix
except SkipPrefix:
self.prefix_queue.task_done()
continue
for r in self.date_regexes:
regex_result = re.search(r, prefix)
if regex_result is not None:
break
try:
if regex_result is None:
print("traversing {}".format(prefix))
# traverse this prefix
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix, Delimiter="/")
resp_code = ret['ResponseMetadata']['HTTPStatusCode']
if resp_code != 200:
print("Got response code {} on prefix {}".format(resp_code, prefix))
break
try:
prefixes = ret["CommonPrefixes"]
except KeyError:
prefixes = []
try:
while ret["IsTruncated"]:
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix, Delimiter="/", ContinuationToken=ret["NextContinuationToken"])
prefixes += ret["CommonPrefixes"]
except KeyError:
pass
for item in prefixes:
self.prefix_queue.put(item["Prefix"])
elif regex_result.group(1) == self.search_date:
print("matching prefix: {}".format(prefix))
self.match_queue.put(prefix)
except IndexError:
print("Index error on prefix '{}': match: {}".format(prefix, regex_result.group(0)))
# a matching regex but a non-matching date is a no-op
self.prefix_queue.task_done()
class DupeJobIdWorker(threading.Thread):
def __init__(self, client, match_queue, target_bucket, job_id_regex, *args, **kwargs):
super(DupeJobIdWorker, self).__init__(*args, **kwargs)
self.client = client
self.match_queue = match_queue
self.target_bucket = target_bucket
self.job_id_regex = job_id_regex
def run(self):
while True:
prefix = self.match_queue.get()
if prefix is None:
break
print("Checking {}".format(prefix))
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix)
resp_code = ret['ResponseMetadata']['HTTPStatusCode']
if resp_code != 200:
print("Got response code {} on prefix {}".format(resp_code, prefix))
break
try:
matches = [re.search(self.job_id_regex, item['Key']) for item in ret['Contents']]
except KeyError:
print("KeyError while processing {}".format(prefix))
self.match_queue.task_done()
next
try:
while ret["IsTruncated"]:
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix, ContinuationToken=ret["NextContinuationToken"])
matches += [re.search(self.job_id_regex, item['Key']) for item in ret['Contents']]
except KeyError:
pass
job_ids = set([match.group(1) for match in matches if match is not None])
if len(job_ids) > 1:
print("MULTIPLE JOB IDS FOUND: {}".format(prefix))
self.match_queue.task_done()
@click.command()
@click.argument('target_bucket')
@click.argument('root_key')
@click.argument('search_date')
@click.option('--date_pattern', multiple=True)
@click.option('--skip_prefixes', multiple=True)
@click.option('--job_id_pattern', default='part-\d+-([\w-]+)[\.\w]*\.parquet')
@click.option('--num_threads', default=5)
def find_dupe_job_ids(target_bucket, root_key, search_date, date_pattern ,skip_prefixes, job_id_pattern, num_threads):
if len(date_pattern) == 0:
date_regexes=[r"=([\d-]{8,})/",
r"/v([\d_]{8,})/",
r"cutoffs-([\d-]+)/",
r"/([\d]{8,})/"]
else:
date_regexes = [re.compile(pattern) for pattern in date_pattern]
if len(skip_prefixes) == 0:
skip_prefixes = ['aggregates_poc/', 'logs/']
client = boto3.client('s3')
job_id_regex = re.compile(job_id_pattern)
prefix_queue = Queue()
prefix_queue.put(root_key)
match_queue = Queue()
threads = []
for i in range(num_threads):
t = DateMatchWorker(client, target_bucket, prefix_queue, match_queue, search_date, date_regexes, skip_prefixes)
t.start()
threads.append(t)
prefix_queue.join()
for i in range(num_threads):
prefix_queue.put(None)
for t in threads:
t.join()
dupe_threads = []
for i in range(num_threads):
t = DupeJobIdWorker(client, match_queue, target_bucket, job_id_regex)
t.start()
dupe_threads.append(t)
match_queue.join()
for i in range(num_threads):
match_queue.put(None)
for t in dupe_threads:
t.join()
if __name__ == '__main__':
find_dupe_job_ids()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment