Skip to content

Instantly share code, notes, and snippets.

@BrendanSchell
Last active April 27, 2020 11:38
Show Gist options
  • Save BrendanSchell/99016af3ad34adaadc534a0716c72fcb to your computer and use it in GitHub Desktop.
Save BrendanSchell/99016af3ad34adaadc534a0716c72fcb to your computer and use it in GitHub Desktop.
Run an activated python env on spark cluster
import os
import argparse
import sys
import pathlib
import shutil
import logging
import subprocess
"""This is a program to start a spark application using the activated
python environment on the cluster
example usage:
1. conda activate <env_name>
2. run script (below are some easy ways to do so):
jupyter notebook:
python run_custom_spark_env.py pyspark -d jupyter -o notebook
jupyter lab:
python run_custom_spark_env.py pyspark -d jupyter -o lab
spark-submit
python run_custom_spark_env.py spark-submit -p test.py
"""
def read_options():
# Make parser object
parser = argparse.ArgumentParser(
description="""
This is a program to start a spark application using the activated
python environment on the cluster
""",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
subparsers = parser.add_subparsers(dest="subcommand")
# subparser for spark submit
parser_shell = subparsers.add_parser("pyspark")
parser_shell.add_argument(
"-d",
"--python-driver",
help="python driver application to use. Ex: 'jupyter' to use jupyter.",
default=sys.executable,
)
parser_shell.add_argument(
"-o",
"--python-driver-opts",
help="python driver application options. Ex: 'notebook' or 'lab' for jupyter.",
default="",
)
parser_spark_submit = subparsers.add_parser("spark-submit")
parser_spark_submit.add_argument(
"-p",
"--python-app",
help="python application to run from spark-submit",
required=True,
)
parser_spark_submit.add_argument(
"--python-app-opts",
help="Additional arguments to supply to your python application as a string",
default="",
)
parser.add_argument(
"--spark-opts",
help="additional arguments to pass to spark as as string. Ex: conf settings.",
default="",
)
parser.add_argument(
"-u",
"--update-zip",
action="store_const",
const=True,
default=False,
help="if set, will replace the current zip archive if found",
)
return parser.parse_args()
if __name__ == "__main__":
options = read_options()
logging.basicConfig(level=logging.NOTSET)
logging.root.setLevel(logging.NOTSET)
# set spark home to default for EMR
os.environ["SPARK_HOME"] = "/usr/lib/spark"
python_path = sys.executable
parent_dir = pathlib.PurePath(python_path).parent
# get directory of venv
venv_path = pathlib.PurePath(python_path).parent.parent
# get venv folder name
venv_name = venv_path.name
# get folder containing venv
env_parent_path = venv_path.parent
# zip up venv so can be shipped if doesn't exist or update arg true
logger = logging.getLogger("run_custom_env_on_spark")
if (
not os.path.exists("{venv_name}.zip".format(venv_name=venv_name))
or options.update_zip
):
logger.info("Zipping up environment. This might take some time...")
# make zip archive of venv
shutil.make_archive(
"{venv_name}".format(venv_name=venv_name),
"zip",
root_dir=str(env_parent_path),
base_dir=venv_name,
logger=logger,
)
logger.info("Environment zipped. Starting application...")
# run spark using local
# venv for driver if pyspark mode and shipped venv for executors
if options.subcommand == "pyspark":
os.environ["PYSPARK_DRIVER_PYTHON"] = options.python_driver
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = options.python_driver_opts
# set python for driver
options.spark_opts += ' --conf "spark.pyspark.driver.python={python_path}"'.format(
python_path=options.python_driver
)
elif options.subcommand == "spark-submit":
# set python for driver
options.spark_opts += ' --conf "spark.pyspark.driver.python={python_path}"'.format(
python_path=sys.executable
)
# set python for executors
options.spark_opts += ' --conf "spark.pyspark.python=./{venv_name}_zip/{venv_name}/bin/python" '.format(
venv_name=venv_name
)
# add zip file to ship
options.spark_opts += ' --archives "{cur_path}/{venv_name}.zip#{venv_name}_zip"'.format(
cur_path=os.getcwd(), venv_name=venv_name
)
command_list = [options.subcommand] + options.spark_opts.split()
# add spark-submit python file and python args
if options.subcommand == "spark-submit":
command_list += [options.python_app]
command_list += options.python_app_opts.split()
# execute command
subprocess.call(" ".join(command_list), shell=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment