Created
December 21, 2018 15:33
-
-
Save sunahsuh/ab87a26ebf5062b66590f5ae029c0408 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import click | |
import re | |
import threading | |
from time import sleep | |
from queue import Queue | |
from itertools import ifilter | |
class DateMatchWorker(threading.Thread): | |
def __init__(self, client, target_bucket, prefix_queue, match_queue, search_date, date_regexes, skip_prefixes, *args, **kwargs): | |
super(DateMatchWorker, self).__init__(*args, **kwargs) | |
self.client = client | |
self.target_bucket = target_bucket | |
self.prefix_queue = prefix_queue | |
self.match_queue = match_queue | |
self.date_regexes = date_regexes | |
self.search_date = search_date | |
self.skip_prefixes = skip_prefixes | |
def run(self): | |
while True: | |
prefix = self.prefix_queue.get() | |
if prefix is None: | |
break | |
class SkipPrefix(Exception): | |
pass | |
try: | |
for p in self.skip_prefixes: | |
if p in prefix: | |
print("found {}".format(p)) | |
raise SkipPrefix | |
except SkipPrefix: | |
self.prefix_queue.task_done() | |
continue | |
for r in self.date_regexes: | |
regex_result = re.search(r, prefix) | |
if regex_result is not None: | |
break | |
try: | |
if regex_result is None: | |
print("traversing {}".format(prefix)) | |
# traverse this prefix | |
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix, Delimiter="/") | |
resp_code = ret['ResponseMetadata']['HTTPStatusCode'] | |
if resp_code != 200: | |
print("Got response code {} on prefix {}".format(resp_code, prefix)) | |
break | |
try: | |
prefixes = ret["CommonPrefixes"] | |
except KeyError: | |
prefixes = [] | |
try: | |
while ret["IsTruncated"]: | |
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix, Delimiter="/", ContinuationToken=ret["NextContinuationToken"]) | |
prefixes += ret["CommonPrefixes"] | |
except KeyError: | |
pass | |
for item in prefixes: | |
self.prefix_queue.put(item["Prefix"]) | |
elif regex_result.group(1) == self.search_date: | |
print("matching prefix: {}".format(prefix)) | |
self.match_queue.put(prefix) | |
except IndexError: | |
print("Index error on prefix '{}': match: {}".format(prefix, regex_result.group(0))) | |
# a matching regex but a non-matching date is a no-op | |
self.prefix_queue.task_done() | |
class DupeJobIdWorker(threading.Thread): | |
def __init__(self, client, match_queue, target_bucket, job_id_regex, *args, **kwargs): | |
super(DupeJobIdWorker, self).__init__(*args, **kwargs) | |
self.client = client | |
self.match_queue = match_queue | |
self.target_bucket = target_bucket | |
self.job_id_regex = job_id_regex | |
def run(self): | |
while True: | |
prefix = self.match_queue.get() | |
if prefix is None: | |
break | |
print("Checking {}".format(prefix)) | |
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix) | |
resp_code = ret['ResponseMetadata']['HTTPStatusCode'] | |
if resp_code != 200: | |
print("Got response code {} on prefix {}".format(resp_code, prefix)) | |
break | |
try: | |
matches = [re.search(self.job_id_regex, item['Key']) for item in ret['Contents']] | |
except KeyError: | |
print("KeyError while processing {}".format(prefix)) | |
self.match_queue.task_done() | |
next | |
try: | |
while ret["IsTruncated"]: | |
ret = self.client.list_objects_v2(Bucket=self.target_bucket, Prefix=prefix, ContinuationToken=ret["NextContinuationToken"]) | |
matches += [re.search(self.job_id_regex, item['Key']) for item in ret['Contents']] | |
except KeyError: | |
pass | |
job_ids = set([match.group(1) for match in matches if match is not None]) | |
if len(job_ids) > 1: | |
print("MULTIPLE JOB IDS FOUND: {}".format(prefix)) | |
self.match_queue.task_done() | |
@click.command() | |
@click.argument('target_bucket') | |
@click.argument('root_key') | |
@click.argument('search_date') | |
@click.option('--date_pattern', multiple=True) | |
@click.option('--skip_prefixes', multiple=True) | |
@click.option('--job_id_pattern', default='part-\d+-([\w-]+)[\.\w]*\.parquet') | |
@click.option('--num_threads', default=5) | |
def find_dupe_job_ids(target_bucket, root_key, search_date, date_pattern ,skip_prefixes, job_id_pattern, num_threads): | |
if len(date_pattern) == 0: | |
date_regexes=[r"=([\d-]{8,})/", | |
r"/v([\d_]{8,})/", | |
r"cutoffs-([\d-]+)/", | |
r"/([\d]{8,})/"] | |
else: | |
date_regexes = [re.compile(pattern) for pattern in date_pattern] | |
if len(skip_prefixes) == 0: | |
skip_prefixes = ['aggregates_poc/', 'logs/'] | |
client = boto3.client('s3') | |
job_id_regex = re.compile(job_id_pattern) | |
prefix_queue = Queue() | |
prefix_queue.put(root_key) | |
match_queue = Queue() | |
threads = [] | |
for i in range(num_threads): | |
t = DateMatchWorker(client, target_bucket, prefix_queue, match_queue, search_date, date_regexes, skip_prefixes) | |
t.start() | |
threads.append(t) | |
prefix_queue.join() | |
for i in range(num_threads): | |
prefix_queue.put(None) | |
for t in threads: | |
t.join() | |
dupe_threads = [] | |
for i in range(num_threads): | |
t = DupeJobIdWorker(client, match_queue, target_bucket, job_id_regex) | |
t.start() | |
dupe_threads.append(t) | |
match_queue.join() | |
for i in range(num_threads): | |
match_queue.put(None) | |
for t in dupe_threads: | |
t.join() | |
if __name__ == '__main__': | |
find_dupe_job_ids() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment