Created
April 5, 2018 00:01
-
-
Save sunahsuh/3ff14afd55d6e45afbf48c0c0032010b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from datetime import datetime, timedelta | |
import argparse | |
import threading | |
import Queue | |
import subprocess | |
fmt = "%Y%m%d" | |
FROM_PATH = "telemetry-parquet" | |
TO_PATH = "telemetry-test-bucket/ssuh/copy_tests" | |
def do_sync(q): | |
while True: | |
cmd = q.get() | |
subprocess.call(cmd) | |
q.task_done() | |
def get_sync_cmd(day, table, version, delete, dry_run=True): | |
opts = [] | |
if dry_run: | |
opts.append("--dryrun") | |
if delete: | |
opts.append("--delete") | |
from_key = "s3://{from_bucket}/{table}/{version}/submission_date_s3={date}".format(from_bucket=FROM_PATH, date=day.strftime(fmt), table=table, version=version) | |
to_key = "s3://{to_bucket}/{table}/{version}/submission_date_s3={date}".format(to_bucket=TO_PATH, date=day.strftime(fmt), table=table, version=version) | |
return ["aws", "s3", "sync", from_key, to_key] + opts | |
def main(begin, end, table, version, delete, dry_run, num_threads): | |
current = datetime.strptime(begin, fmt) | |
end = datetime.strptime(end, fmt) | |
q = Queue.Queue(maxsize=num_threads) | |
for i in range(num_threads): | |
worker = threading.Thread(target=do_sync, args=(q,)) | |
worker.setDaemon(True) | |
worker.start() | |
while current <= end: | |
q.put(get_sync_cmd(current, table, version, delete, dry_run)) | |
current += timedelta(days=1) | |
q.join() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Move backfilled files') | |
parser.add_argument('--table', type=str, required=True, help="table to move") | |
parser.add_argument('--version', type=str, required=True, help="version to move") | |
parser.add_argument('--start', type=str, required=True, help="Start date in YYYYMMDD") | |
parser.add_argument('--end', type=str, required=True, help="End date in YYYYMMDD") | |
parser.add_argument('--delete', required=False, default=False, action='store_true', help="Whether to delete files at destination not in source") | |
parser.add_argument('--dryrun', required=False, default=False, action='store_true', help="Whether dry-run is enabled") | |
parser.add_argument('--threads', type=int, required=False, default=5, help="Specify number of threads") | |
args = parser.parse_args() | |
main(args.start, args.end, args.table, args.version, args.delete, args.dryrun, args.threads) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment