Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save LiShuMing/3ca657499f580c453f18b5a91e7a2ce7 to your computer and use it in GitHub Desktop.
Save LiShuMing/3ca657499f580c453f18b5a91e7a2ce7 to your computer and use it in GitHub Desktop.
PySpark on YARN in self-contained environments

PySpark on YARN in self-contained environments

Author: https://github.com/seanorama

Reference:

Why

By default, pyspark jobs will use the Python from the local system of each YARN NodeManager host. Which means:

  • If a different version of Python is needed, it must be installed and maintain on all of the hosts
  • If additional modules (i.e. from pip) are needed they must be installed and maintained on all of the hosts

This presents a lot of overhead and introduces many risks. Also, the Spark developers typically do not have direct access to YARN NodeManager hosts.

Further, it is a good practice to manage dependencies from the development side, which is only possible if all Python dependencies are "self-contained".

Overview

There are 2 methods to have self-contained environments:

  • a) Use an archive (i.e. tar.gz) of a Python environment (virtualenv or conda):
    • Benefits:
      • Faster load time than an empty 'virtualenv' since the packages are already present.
      • Uses the YARN Shared cache: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/SharedCache.html
        • During 1st use of the environment, it will be cached. Future uses will load very fast.
      • Not tied to the Python driver of the NodeManager machines, meaning any Python version can be used and nothing ever has to be done on the NodeManagers.
      • Completely self-contained. No external dependencies.
    • Caveats:
      • Can't install additional packages at run-time.
  • b) Use python.pyspark.virtualenv which creates a new virtualenv at Spark runtime:
    • Benefits:
      • Install packages at runtime.
    • Caveats:
      • Not entirely self-contained since it depends on the interpreter being available on all YARN NodeManager hosts (i.e. at /usr/bin/python3 or wherever you place it). Which also makes it harder to change versions.
      • Very SLOW as every job will require downloading and building the pip packages.
      • Depends on public internet access, or a local network pypi/conda repo.

How to: Use an archive (i.e. tar.gz) of a Python environment (virtualenv or conda):

Overview:

  1. Create an environment with virtualenv or conda
  2. Archive the environment to a .tar.gz or .zip.
  3. Upload the archive to HDFS
  4. Tell Spark (via spark-submit, pyspark, livy, zeppelin) to use this environment
  5. Repeat for each different virtualenv that is required or when the virtualenv needs updating

(Optional) Create a shared HDFS path for storing the environment(s):

Only necessary if sharing the environment and if a location doesn't already exist.

sudo -u hdfs -i

## if kerberos
keytab=/etc/security/keytabs/hdfs.headless.keytab
kinit -kt ${keytab} $(klist -kt ${keytab}| awk '{print $NF}'|tail -1)
## endif

hdfs dfs -mkdir -p /share/python-envs
hdfs dfs -chmod -R 775 /share

## replace the group with a user group that will be managing the archives
hdfs dfs -chown -R hdfs:hadoop /share

exit

Create archive using Python virtualenv and tar

  1. Install python3 and python-virtualenv on a host with same Operating System and CPU architecture as the cluster, such as an edge host:
    • Can be in your home-directory without root/sudo access
      • downloading python manually
      • using pyenv or similar application
    • Or system-wide:
sudo yum install python3 python-virtualenv
  1. Create the archive:
## Create requirements for `pip`
tee requirements.txt > /dev/null << EOF
arrow
jupyter
numpy
pandas
scikit-learn
EOF

## The name of the environment
env="python3-venv"

## Create the environment

python3 -m venv ${env} --copies
virtualenv --python=/usr/bin/python3 ${env}

source ${env}/bin/activate
pip install -U pip
pip install -r requirements.txt
deactivate

## Archive the environment
cd ${env}
tar -zcf ../${env}.tar.gz *
cd ..

Or create archive using conda and conda-pack

  1. Install Anaconda or Conda on a host with same Operating System and CPU architecture as the cluster, such as an edge host:

  2. Create the archive:

## Create requirements for `conda`
tee requirements.txt > /dev/null << EOF
arrow
jupyter
numpy
pandas
scikit-learn
EOF

## The name of the environment
env="python3-venv"

## Create the environment
conda create -y -n ${env} python=3.7 ## May give a file-system error. If so, simply run again. This is due to conda not being initialized
conda activate ${env}
conda install -y -n ${env} -f requirements.txt
conda install -y -n ${env} -c conda-forge conda-pack

## Archive the environment
conda pack -f -n ${env} -o ${env}.tar.gz

conda deactivate

Use the archive: Same steps for a virtualenv or for conda:

  1. Put to HDFS:
    • You may need to kinit 1st. Can do this as your own user.
hdfs dfs -put -f ${env}.tar.gz /share/python-envs/
hdfs dfs -chmod 0664 /share/python-envs/${env}.tar.gz
  1. Test it:
## Create test script
tee test.py > /dev/null << EOF
from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setAppName('pyspark-test')
sc = SparkContext(conf=conf)

import numpy
print("Hello World!")
sc.parallelize(range(1,10)).map(lambda x : numpy.__version__).collect()
EOF

## Submit to Spark
deactivate
conda deactivate

## Note: `--archives` can be used instead of `--conf spark.yarn.dist.archives`. I prefer to see the full conf statement.
spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--conf spark.yarn.dist.archives=hdfs:///share/python-envs/${env}.tar.gz#environment \
--master yarn \
--deploy-mode cluster \
test.py

## Check the logs: Update the `id` to the id of your job from above.
id=application_GetTheIdFromOutputOfCommandAbove
yarn logs -applicationId ${id} | grep "Hello World"
  1. Update Zeppelin %livy2.pyspark
  • Note: This only works with YARN cluster deploy mode. It is the default in HDP3.
  • In Zeppelin: click "top right" menu -> Interpreters -> Add to the livy2 interpreter:
livy.spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python
livy.spark.yarn.dist.archives=hdfs:///share/python-envs/python3-venv.tar.gz#environment
  • Test from a notebook:
%livy2.pyspark
import numpy
print("Hello World!")
sc.parallelize(range(1,10)).map(lambda x : numpy.__version__).collect()

Howto create a virtualenv at run-time

  1. Install python3 and python-virtualenv

    • For this method it must be done on all hosts in the cluster.
    • See the archives instructions for more details of how to install.
  2. Example using pyspark shell:

PYSPARK_PYTHON=/bin/python3 \
pyspark \
--conf spark.pyspark.virtualenv.enabled=true \
--conf spark.pyspark.virtualenv.type=native \
--conf spark.pyspark.virtualenv.bin.path=/usr/bin/virtualenv \
--conf spark.pyspark.virtualenv.python_version=3.6 \
--master yarn \
--deploy-mode client

## then, in the shell, install packages:
sc.install_packages(["numpy"])
  1. Example using spark-submit.
    • Note the addition of a requirements file.
      • This is optional, you could use the same sc.install_packages file instead inside test.py.
      • If using from HDFS, make sure to upload it ;)
      • You can use it with pyspark above, but the file then must be on the localhost that you are executing pyspark from.
spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/usr/bin/python3 \
--conf spark.pyspark.virtualenv.enabled=true \
--conf spark.pyspark.virtualenv.type=native \
--conf spark.pyspark.virtualenv.bin.path=/usr/bin/virtualenv \
--conf spark.pyspark.virtualenv.python_version=3.6 \
--conf spark.pyspark.virtualenv.requirements=hdfs:///share/python-envs/python3-venv.requirements.txt \
--master yarn \
--deploy-mode cluster \
test.py
  1. Update Zeppelin %livy2.pyspark

    • In Zeppelin: click "top right" menu -> Interpreters -> Add to the livy2 interpreter:
livy.spark.yarn.appMasterEnv.PYSPARK_PYTHON=/usr/bin/python3
livy.spark.pyspark.virtualenv.enabled=true
livy.spark.pyspark.virtualenv.type=virtualenv
livy.spark.pyspark.virtualenv.bin.path=/usr/bin/virtualenv
livy.spark.pyspark.virtualenv.python_version=3.7
livy.spark.pyspark.virtualenv.requirements=hdfs:///share/python-envs/python3-venv.requirements.txt ## this is optional
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment