Last active
April 27, 2020 11:38
-
-
Save BrendanSchell/99016af3ad34adaadc534a0716c72fcb to your computer and use it in GitHub Desktop.
Run an activated python env on spark cluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import argparse | |
import sys | |
import pathlib | |
import shutil | |
import logging | |
import subprocess | |
"""This is a program to start a spark application using the activated | |
python environment on the cluster | |
example usage: | |
1. conda activate <env_name> | |
2. run script (below are some easy ways to do so): | |
jupyter notebook: | |
python run_custom_spark_env.py pyspark -d jupyter -o notebook | |
jupyter lab: | |
python run_custom_spark_env.py pyspark -d jupyter -o lab | |
spark-submit | |
python run_custom_spark_env.py spark-submit -p test.py | |
""" | |
def read_options(): | |
# Make parser object | |
parser = argparse.ArgumentParser( | |
description=""" | |
This is a program to start a spark application using the activated | |
python environment on the cluster | |
""", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
) | |
subparsers = parser.add_subparsers(dest="subcommand") | |
# subparser for spark submit | |
parser_shell = subparsers.add_parser("pyspark") | |
parser_shell.add_argument( | |
"-d", | |
"--python-driver", | |
help="python driver application to use. Ex: 'jupyter' to use jupyter.", | |
default=sys.executable, | |
) | |
parser_shell.add_argument( | |
"-o", | |
"--python-driver-opts", | |
help="python driver application options. Ex: 'notebook' or 'lab' for jupyter.", | |
default="", | |
) | |
parser_spark_submit = subparsers.add_parser("spark-submit") | |
parser_spark_submit.add_argument( | |
"-p", | |
"--python-app", | |
help="python application to run from spark-submit", | |
required=True, | |
) | |
parser_spark_submit.add_argument( | |
"--python-app-opts", | |
help="Additional arguments to supply to your python application as a string", | |
default="", | |
) | |
parser.add_argument( | |
"--spark-opts", | |
help="additional arguments to pass to spark as as string. Ex: conf settings.", | |
default="", | |
) | |
parser.add_argument( | |
"-u", | |
"--update-zip", | |
action="store_const", | |
const=True, | |
default=False, | |
help="if set, will replace the current zip archive if found", | |
) | |
return parser.parse_args() | |
if __name__ == "__main__": | |
options = read_options() | |
logging.basicConfig(level=logging.NOTSET) | |
logging.root.setLevel(logging.NOTSET) | |
# set spark home to default for EMR | |
os.environ["SPARK_HOME"] = "/usr/lib/spark" | |
python_path = sys.executable | |
parent_dir = pathlib.PurePath(python_path).parent | |
# get directory of venv | |
venv_path = pathlib.PurePath(python_path).parent.parent | |
# get venv folder name | |
venv_name = venv_path.name | |
# get folder containing venv | |
env_parent_path = venv_path.parent | |
# zip up venv so can be shipped if doesn't exist or update arg true | |
logger = logging.getLogger("run_custom_env_on_spark") | |
if ( | |
not os.path.exists("{venv_name}.zip".format(venv_name=venv_name)) | |
or options.update_zip | |
): | |
logger.info("Zipping up environment. This might take some time...") | |
# make zip archive of venv | |
shutil.make_archive( | |
"{venv_name}".format(venv_name=venv_name), | |
"zip", | |
root_dir=str(env_parent_path), | |
base_dir=venv_name, | |
logger=logger, | |
) | |
logger.info("Environment zipped. Starting application...") | |
# run spark using local | |
# venv for driver if pyspark mode and shipped venv for executors | |
if options.subcommand == "pyspark": | |
os.environ["PYSPARK_DRIVER_PYTHON"] = options.python_driver | |
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = options.python_driver_opts | |
# set python for driver | |
options.spark_opts += ' --conf "spark.pyspark.driver.python={python_path}"'.format( | |
python_path=options.python_driver | |
) | |
elif options.subcommand == "spark-submit": | |
# set python for driver | |
options.spark_opts += ' --conf "spark.pyspark.driver.python={python_path}"'.format( | |
python_path=sys.executable | |
) | |
# set python for executors | |
options.spark_opts += ' --conf "spark.pyspark.python=./{venv_name}_zip/{venv_name}/bin/python" '.format( | |
venv_name=venv_name | |
) | |
# add zip file to ship | |
options.spark_opts += ' --archives "{cur_path}/{venv_name}.zip#{venv_name}_zip"'.format( | |
cur_path=os.getcwd(), venv_name=venv_name | |
) | |
command_list = [options.subcommand] + options.spark_opts.split() | |
# add spark-submit python file and python args | |
if options.subcommand == "spark-submit": | |
command_list += [options.python_app] | |
command_list += options.python_app_opts.split() | |
# execute command | |
subprocess.call(" ".join(command_list), shell=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment