Skip to content

Instantly share code, notes, and snippets.

@sunahsuh
Created April 5, 2018 00:01
Show Gist options
  • Save sunahsuh/3ff14afd55d6e45afbf48c0c0032010b to your computer and use it in GitHub Desktop.
Save sunahsuh/3ff14afd55d6e45afbf48c0c0032010b to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from datetime import datetime, timedelta
import argparse
import threading
import Queue
import subprocess
fmt = "%Y%m%d"
FROM_PATH = "telemetry-parquet"
TO_PATH = "telemetry-test-bucket/ssuh/copy_tests"
def do_sync(q):
while True:
cmd = q.get()
subprocess.call(cmd)
q.task_done()
def get_sync_cmd(day, table, version, delete, dry_run=True):
opts = []
if dry_run:
opts.append("--dryrun")
if delete:
opts.append("--delete")
from_key = "s3://{from_bucket}/{table}/{version}/submission_date_s3={date}".format(from_bucket=FROM_PATH, date=day.strftime(fmt), table=table, version=version)
to_key = "s3://{to_bucket}/{table}/{version}/submission_date_s3={date}".format(to_bucket=TO_PATH, date=day.strftime(fmt), table=table, version=version)
return ["aws", "s3", "sync", from_key, to_key] + opts
def main(begin, end, table, version, delete, dry_run, num_threads):
current = datetime.strptime(begin, fmt)
end = datetime.strptime(end, fmt)
q = Queue.Queue(maxsize=num_threads)
for i in range(num_threads):
worker = threading.Thread(target=do_sync, args=(q,))
worker.setDaemon(True)
worker.start()
while current <= end:
q.put(get_sync_cmd(current, table, version, delete, dry_run))
current += timedelta(days=1)
q.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Move backfilled files')
parser.add_argument('--table', type=str, required=True, help="table to move")
parser.add_argument('--version', type=str, required=True, help="version to move")
parser.add_argument('--start', type=str, required=True, help="Start date in YYYYMMDD")
parser.add_argument('--end', type=str, required=True, help="End date in YYYYMMDD")
parser.add_argument('--delete', required=False, default=False, action='store_true', help="Whether to delete files at destination not in source")
parser.add_argument('--dryrun', required=False, default=False, action='store_true', help="Whether dry-run is enabled")
parser.add_argument('--threads', type=int, required=False, default=5, help="Specify number of threads")
args = parser.parse_args()
main(args.start, args.end, args.table, args.version, args.delete, args.dryrun, args.threads)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment