Created
June 20, 2019 15:45
-
-
Save joshuarobinson/fbe3c8914ad1a859b2974e6b3822b390 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"PTSpark\n", | |
"\n", | |
"spark_rsync() uses Spark and PureTools to parallelize an rsync-like directory tree copy.\n", | |
"\n", | |
"Usage:\n", | |
"spark_rsync(\"/mount/path/source\", \"/mount/path/destination\")\n", | |
"\n", | |
"Notes on usage:\n", | |
" * Rsync-*like* semantics, only copies from src->dst if file appears changed\n", | |
" * Recursive copy\n", | |
" * Assumes both source and destination are NFS mounts" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Generic imports\n", | |
"import json\n", | |
"import os\n", | |
"import shutil\n", | |
"import subprocess\n", | |
"import time" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Pyspark imports\n", | |
"from pyspark.sql.types import *\n", | |
"from pyspark.sql.functions import UserDefinedFunction\n", | |
"from pyspark.sql.types import StringType" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Spark counters to understand how many operations the copy job did (skips, removes, copies)\n", | |
"cntr_skip = sc.accumulator(0)\n", | |
"cntr_remove = sc.accumulator(0)\n", | |
"cntr_copy = sc.accumulator(0)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def collect_listings(srcdir, dstdir):\n", | |
" \"\"\"Use PureTools to return json-encoded directory listings.\n", | |
" Operates on both source and destination in parallel.\"\"\"\n", | |
" p1 = subprocess.Popen([\"pls\", \"-a\", \"-R\", \"--json\", srcdir], stdout = subprocess.PIPE, stderr = subprocess.PIPE)\n", | |
" p2 = subprocess.Popen([\"pls\", \"-a\", \"-R\", \"--json\", dstdir], stdout = subprocess.PIPE, stderr = subprocess.PIPE)\n", | |
" \n", | |
" (result1, error1) = p1.communicate()\n", | |
" (result2, error2) = p2.communicate()\n", | |
" \n", | |
" if p1.returncode or p2.returncode or error1 or error2:\n", | |
" raise Exception('Failed listing directories')\n", | |
" \n", | |
" srclisting = result1.decode('utf-8')\n", | |
" dstlisting = result2.decode('utf-8')\n", | |
" \n", | |
" return (srclisting, dstlisting)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def collect_file_metadata(srcdir, dstdir):\n", | |
" \"\"\"Build joined dataframe containing all files/directories in both src and dest.\"\"\"\n", | |
" \n", | |
" # Collect the directory contents at both ends.\n", | |
" (src_raw, dst_raw) = collect_listings(srcdir, dstdir)\n", | |
" \n", | |
" # Convert listings to dataframes to parallelize the checks and copies.\n", | |
" srcfiles = spark.read.json(sc.parallelize(src_raw.splitlines()))\n", | |
" dstfiles = spark.read.json(sc.parallelize(dst_raw.splitlines()))\n", | |
" \n", | |
" # Rename all columns in destination table to have a prefix, distinguishes src and dst in joined table.\n", | |
" dstfiles = dstfiles.rdd.toDF([\"dst_\" + n for n in dstfiles.schema.names])\n", | |
" \n", | |
" # Add a column in each table for \"filename\" which will be used to join().\n", | |
" basename_udf = UserDefinedFunction(lambda p: os.path.basename(p), StringType())\n", | |
"\n", | |
" srcfiles = srcfiles.withColumn('filename', basename_udf(srcfiles.path))\n", | |
" dstfiles = dstfiles.withColumn('filename', basename_udf(dstfiles.dst_path))\n", | |
" \n", | |
" # Join tables.\n", | |
" joinedfiles = srcfiles.join(dstfiles, 'filename', 'outer')\n", | |
" return joinedfiles" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def copier(srcmount, dstmount, entry):\n", | |
" \n", | |
" \"\"\"Evaluate and operate on an individual file record.\n", | |
" Possible actions: skip, copy, or remove.\"\"\"\n", | |
" if entry[\"filename\"] in [\".\", \"..\"]:\n", | |
" return\n", | |
" \n", | |
" # Skip if we think this file is already at destination, unchanged.\n", | |
" if entry[\"dst_size\"] and entry[\"size\"] == entry[\"dst_size\"] and entry[\"mtime\"] == entry[\"dst_mtime\"]:\n", | |
" cntr_skip.add(1)\n", | |
" return\n", | |
" \n", | |
" srcpath = os.path.join(srcmount, \".\" + entry[\"path\"])\n", | |
" dstpath = srcpath.replace(srcmount, dstmount, 1)\n", | |
" \n", | |
" # Remove files that are at the destination but not source.\n", | |
" if not entry[\"size\"] and entry[\"dst_size\"]:\n", | |
" cntr_remove.add(1)\n", | |
" os.remove(srcpath)\n", | |
" \n", | |
" if os.path.isfile(srcpath):\n", | |
" cntr_copy.add(1)\n", | |
" # Copy2 attempts to preserve metadata also\n", | |
" shutil.copy2(srcpath, dstpath)\n", | |
" else:\n", | |
" cntr_skip.add(1)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def find_mount(path):\n", | |
" \"\"\"Helper function to find the mount point of a full path.\"\"\"\n", | |
" while not os.path.ismount(path):\n", | |
" path = os.path.dirname(path)\n", | |
" return path" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def spark_rsync(srcdir, dstdir):\n", | |
" \n", | |
" \"\"\"Run Spark-ified rsync code.\n", | |
" Returns run-time breakdown tuple of 1) PureTools enumeration and 2) parallelized copy operations.\"\"\"\n", | |
" \n", | |
" srcmount = find_mount(srcdir)\n", | |
" destmount = find_mount(dstdir)\n", | |
" \n", | |
" # Create destination if not already exists.\n", | |
" os.makedirs(dstdir, exist_ok=True)\n", | |
" \n", | |
" # Three steps to this process: 1) collect metadata, 2) build directory tree, 3) copy files.\n", | |
" \n", | |
" # Collect metadata (directories and files) on both src and dst trees.\n", | |
" start_time = time.time()\n", | |
" joinedfiles = collect_file_metadata(srcdir, dstdir)\n", | |
" \n", | |
" # Second, find the unique list of directories and ensure they exist correctly.\n", | |
" directories = joinedfiles.select(\"path\").rdd \\\n", | |
" .map(lambda x: x[\"path\"]) \\\n", | |
" .map(lambda x: os.path.dirname(x)) \\\n", | |
" .distinct() \\\n", | |
" .map(lambda x: os.path.normpath(os.path.join(destmount, \".\" + x))) \\\n", | |
" .map(lambda x: x.replace(srcdir, dstdir, 1))\n", | |
" \n", | |
" mid_time = time.time()\n", | |
" directories.foreach(lambda dir: os.makedirs(dir, exist_ok=True))\n", | |
" \n", | |
" # Finally, invoke parallel copy evaluator for all files.\n", | |
" joinedfiles.foreach(lambda e: copier(srcmount, destmount, e))\n", | |
" \n", | |
" end_time = time.time()\n", | |
" return (mid_time - start_time, end_time - mid_time)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"(212.46247553825378, 189.51126623153687)" | |
] | |
}, | |
"execution_count": 9, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"src = \"/datahub/rsync_dataset/exp3-7\"\n", | |
"dst = \"/acadia/rsync_dataset/exp3-7\"\n", | |
"spark_rsync(src, dst)" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.3" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment