Skip to content

Instantly share code, notes, and snippets.

@joshuarobinson
Created June 20, 2019 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joshuarobinson/fbe3c8914ad1a859b2974e6b3822b390 to your computer and use it in GitHub Desktop.
Save joshuarobinson/fbe3c8914ad1a859b2974e6b3822b390 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"PTSpark\n",
"\n",
"spark_rsync() uses Spark and PureTools to parallelize an rsync-like directory tree copy.\n",
"\n",
"Usage:\n",
"spark_rsync(\"/mount/path/source\", \"/mount/path/destination\")\n",
"\n",
"Notes on usage:\n",
" * Rsync-*like* semantics, only copies from src->dst if file appears changed\n",
" * Recursive copy\n",
" * Assumes both source and destination are NFS mounts"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Generic imports\n",
"import json\n",
"import os\n",
"import shutil\n",
"import subprocess\n",
"import time"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Pyspark imports\n",
"from pyspark.sql.types import *\n",
"from pyspark.sql.functions import UserDefinedFunction\n",
"from pyspark.sql.types import StringType"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# Spark counters to understand how many operations the copy job did (skips, removes, copies)\n",
"cntr_skip = sc.accumulator(0)\n",
"cntr_remove = sc.accumulator(0)\n",
"cntr_copy = sc.accumulator(0)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def collect_listings(srcdir, dstdir):\n",
" \"\"\"Use PureTools to return json-encoded directory listings.\n",
" Operates on both source and destination in parallel.\"\"\"\n",
" p1 = subprocess.Popen([\"pls\", \"-a\", \"-R\", \"--json\", srcdir], stdout = subprocess.PIPE, stderr = subprocess.PIPE)\n",
" p2 = subprocess.Popen([\"pls\", \"-a\", \"-R\", \"--json\", dstdir], stdout = subprocess.PIPE, stderr = subprocess.PIPE)\n",
" \n",
" (result1, error1) = p1.communicate()\n",
" (result2, error2) = p2.communicate()\n",
" \n",
" if p1.returncode or p2.returncode or error1 or error2:\n",
" raise Exception('Failed listing directories')\n",
" \n",
" srclisting = result1.decode('utf-8')\n",
" dstlisting = result2.decode('utf-8')\n",
" \n",
" return (srclisting, dstlisting)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"def collect_file_metadata(srcdir, dstdir):\n",
" \"\"\"Build joined dataframe containing all files/directories in both src and dest.\"\"\"\n",
" \n",
" # Collect the directory contents at both ends.\n",
" (src_raw, dst_raw) = collect_listings(srcdir, dstdir)\n",
" \n",
" # Convert listings to dataframes to parallelize the checks and copies.\n",
" srcfiles = spark.read.json(sc.parallelize(src_raw.splitlines()))\n",
" dstfiles = spark.read.json(sc.parallelize(dst_raw.splitlines()))\n",
" \n",
" # Rename all columns in destination table to have a prefix, distinguishes src and dst in joined table.\n",
" dstfiles = dstfiles.rdd.toDF([\"dst_\" + n for n in dstfiles.schema.names])\n",
" \n",
" # Add a column in each table for \"filename\" which will be used to join().\n",
" basename_udf = UserDefinedFunction(lambda p: os.path.basename(p), StringType())\n",
"\n",
" srcfiles = srcfiles.withColumn('filename', basename_udf(srcfiles.path))\n",
" dstfiles = dstfiles.withColumn('filename', basename_udf(dstfiles.dst_path))\n",
" \n",
" # Join tables.\n",
" joinedfiles = srcfiles.join(dstfiles, 'filename', 'outer')\n",
" return joinedfiles"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def copier(srcmount, dstmount, entry):\n",
" \n",
" \"\"\"Evaluate and operate on an individual file record.\n",
" Possible actions: skip, copy, or remove.\"\"\"\n",
" if entry[\"filename\"] in [\".\", \"..\"]:\n",
" return\n",
" \n",
" # Skip if we think this file is already at destination, unchanged.\n",
" if entry[\"dst_size\"] and entry[\"size\"] == entry[\"dst_size\"] and entry[\"mtime\"] == entry[\"dst_mtime\"]:\n",
" cntr_skip.add(1)\n",
" return\n",
" \n",
" srcpath = os.path.join(srcmount, \".\" + entry[\"path\"])\n",
" dstpath = srcpath.replace(srcmount, dstmount, 1)\n",
" \n",
" # Remove files that are at the destination but not source.\n",
" if not entry[\"size\"] and entry[\"dst_size\"]:\n",
" cntr_remove.add(1)\n",
" os.remove(srcpath)\n",
" \n",
" if os.path.isfile(srcpath):\n",
" cntr_copy.add(1)\n",
" # Copy2 attempts to preserve metadata also\n",
" shutil.copy2(srcpath, dstpath)\n",
" else:\n",
" cntr_skip.add(1)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def find_mount(path):\n",
" \"\"\"Helper function to find the mount point of a full path.\"\"\"\n",
" while not os.path.ismount(path):\n",
" path = os.path.dirname(path)\n",
" return path"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def spark_rsync(srcdir, dstdir):\n",
" \n",
" \"\"\"Run Spark-ified rsync code.\n",
" Returns run-time breakdown tuple of 1) PureTools enumeration and 2) parallelized copy operations.\"\"\"\n",
" \n",
" srcmount = find_mount(srcdir)\n",
" destmount = find_mount(dstdir)\n",
" \n",
" # Create destination if not already exists.\n",
" os.makedirs(dstdir, exist_ok=True)\n",
" \n",
" # Three steps to this process: 1) collect metadata, 2) build directory tree, 3) copy files.\n",
" \n",
" # Collect metadata (directories and files) on both src and dst trees.\n",
" start_time = time.time()\n",
" joinedfiles = collect_file_metadata(srcdir, dstdir)\n",
" \n",
" # Second, find the unique list of directories and ensure they exist correctly.\n",
" directories = joinedfiles.select(\"path\").rdd \\\n",
" .map(lambda x: x[\"path\"]) \\\n",
" .map(lambda x: os.path.dirname(x)) \\\n",
" .distinct() \\\n",
" .map(lambda x: os.path.normpath(os.path.join(destmount, \".\" + x))) \\\n",
" .map(lambda x: x.replace(srcdir, dstdir, 1))\n",
" \n",
" mid_time = time.time()\n",
" directories.foreach(lambda dir: os.makedirs(dir, exist_ok=True))\n",
" \n",
" # Finally, invoke parallel copy evaluator for all files.\n",
" joinedfiles.foreach(lambda e: copier(srcmount, destmount, e))\n",
" \n",
" end_time = time.time()\n",
" return (mid_time - start_time, end_time - mid_time)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(212.46247553825378, 189.51126623153687)"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"src = \"/datahub/rsync_dataset/exp3-7\"\n",
"dst = \"/acadia/rsync_dataset/exp3-7\"\n",
"spark_rsync(src, dst)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment