Skip to content

Instantly share code, notes, and snippets.

@dacort

dacort/README.md Secret

Created September 2, 2021 21:07
Show Gist options
  • Save dacort/df1fba8b1e0cc7ef341d713e25ebf1a4 to your computer and use it in GitHub Desktop.
Save dacort/df1fba8b1e0cc7ef341d713e25ebf1a4 to your computer and use it in GitHub Desktop.
Python 3.8 with Livy on EMR 6

Python 3.8 with Livy on EMR 6

Livy unfortunately does not currently support PySpark with Python >= 3.8.

However, we can patch the EMR primary node on cluster start in order to mitigate this.

This example will also show how to populate Core and Task nodes with Docker images on startup.

Overview

  1. A bootstrap action to execute docker pull on all nodes
  2. An EMR Step to patch and restart Livy on the primary node
  3. Livy Timeout modification script from last time

Scripts

  1. docker pull to update images

We use a bootstrap action as we want this to run on all nodes except for the primary. So we have to do a quick check and then just iterate through all the images in our repository or provided images.

Note that this has to run in the background or the cluster will fail to start due to the bootstrap action taking too long

#!/bin/sh

# First we ensure this is *not* the primary node
if [ `grep 'isMaster' /mnt/var/lib/info/instance.json | awk -F ':' '{print $2}' | awk -F ',' '{print $1}'` = 'true' ]; then
	echo "This is the primary node, exiting."
	exit 0
fi

# Then we create a script that can execute in the background.
# Otherwise, the bootstrap will wait too long and fail the cluster startup.
cat >/tmp/docker_wait.sh <<EOF
#!/bin/sh

# Wait for Docker to be installed
echo -n "Waiting for Docker daemon to start..."
while [ ! \$(pgrep dockerd) ]; do echo -n "."; sleep 1; done

echo -e "\nDocker is running, populating images"

# Define some variables used throughout
AWS_REGION=us-west-2
AWS_ACCOUNT_ID=568026268536
TARGET_REGISTRY=\${AWS_ACCOUNT_ID}.dkr.ecr.\${AWS_REGION}.amazonaws.com

# First we need to authenticate
aws ecr get-login-password --region \${AWS_REGION} | sudo docker login --username AWS --password-stdin \${TARGET_REGISTRY}

# Then we'll iterate through any arguments passed to this and do a docker pull
for image in $@
do
    echo "Pulling Docker image: \$image";
    sudo docker pull \$image
done
EOF

# Execute the script in the background
chmod +x /tmp/docker_wait.sh
nohup /tmp/docker_wait.sh &

Upload this script to S3.

aws s3 cp docker_pull.sh s3://<BUCKET>/emr/bootstrap/
  1. Patch and Restart Livy

We could either provide a custom RPM or just patch Livy on startup. Since Livy is fairly static, we'll just attempt to patch the file.

We'll use a Step as that ensures that 1/ Livy is installed and 2/ we only run on the primary node.

#!/bin/sh

echo "Patching Livy for Python 3.8 support"

LIVY_JAR=/usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.0-incubating.jar

# Unzip the fake_shell.py file to a tmp location
cd /tmp
jar xf ${LIVY_JAR} fake_shell.py

# Now patch the file with the changes from https://github.com/apache/incubator-livy/pull/314
patch --ignore-whitespace << 'EOF'
From 81a4e92b5abbc65f27b35454a8f28a72b8f4179b Mon Sep 17 00:00:00 2001
From: Gabriel Magno <gabrielmagno1@gmail.com>
Date: Wed, 6 Jan 2021 16:07:18 -0300
Subject: [PATCH] Mock ast.Module to work with Python 3.8

---
 repl/src/main/resources/fake_shell.py | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/repl/src/main/resources/fake_shell.py b/repl/src/main/resources/fake_shell.py
index 2a99185b1..b5b284d8f 100644
--- a/repl/src/main/resources/fake_shell.py
+++ b/repl/src/main/resources/fake_shell.py
@@ -40,6 +40,14 @@
     import cStringIO
     import StringIO
 
+if sys.version_info > (3,8):
+    from ast import Module
+else :
+    # mock the new API, ignore second argument
+    # see https://github.com/ipython/ipython/issues/11590
+    from ast import Module as OriginalModule
+    Module = lambda nodelist, type_ignores: OriginalModule(nodelist)
+
 logging.basicConfig()
 LOG = logging.getLogger('fake_shell')
 
@@ -219,7 +227,7 @@ def execute(self):
 
         try:
             for node in to_run_exec:
-                mod = ast.Module([node])
+                mod = Module([node], [])
                 code = compile(mod, '<stdin>', 'exec')
                 exec(code, global_dict)
EOF

# And finally, update the Livy jar and restart Livy
sudo jar uvf ${LIVY_JAR} fake_shell.py
sudo systemctl restart livy-server

Now upload this to S3 as well.

aws s3 cp livy_py38_patch.sh s3://<BUCKET>/emr/steps/

Creating the cluster

Now let's create a cluster with our Bootstrap Actions and Steps!

KEYPAIR=<ssh_keypair>
SUBNET_ID=<subnet_id>
REGION=<REGION>
ACCOUNT_ID=<ACCOUNT_ID>
LOG_BUCKET=aws-logs-${ACCOUNT_ID}-${REGION}

LIVY_PY38_SCRIPT=s3://<BUCKET>/artifacts/steps/livy_py38_patch.sh
LIVY_TIMEOUT_SCRIPT=s3://<BUCKET>/artifacts/steps/change_livy_timeout.sh
DOCKER_PULL_SCRIPT=s3://<BUCKET>/artifacts/bootstrap/docker_pull.sh


aws emr create-cluster --name "emr-docker-spark" \
    --region us-west-2 \
    --release-label emr-6.3.0 \
    --ebs-root-volume-size 100 \
    --enable-debugging \
    --log-uri "s3n://${LOG_BUCKET}/elasticmapreduce/" \
    --applications Name=Spark Name=Livy Name=JupyterEnterpriseGateway \
    --ec2-attributes KeyName=${KEYPAIR},SubnetId=${SUBNET_ID} \
    --use-default-roles \
    --bootstrap-actions '[{"Path":"'${DOCKER_PULL_SCRIPT}'","Name":"DockerPull","Args":["<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/<REPO>:<IMAGE_TAG_01>","<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/<REPO>:<IMAGE_TAG_01>"]}]' \
    --steps '[
        {"Type":"CUSTOM_JAR","Name":"IncreaseLivySessionTimeout","ActionOnFailure":"CONTINUE","Jar":"s3://'${REGION}'.elasticmapreduce/libs/script-runner/script-runner.jar","Args":["'${LIVY_TIMEOUT_SCRIPT}'"]},
        {"Type":"CUSTOM_JAR","Name":"Python38LivyPatch","ActionOnFailure":"CONTINUE","Jar":"s3://'${REGION}'.elasticmapreduce/libs/script-runner/script-runner.jar","Args":["'${LIVY_PY38_SCRIPT}'"]}
    ]' \
    --instance-groups '[{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":75,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m5.xlarge","Name":"CORE"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"MASTER"}]' \
    --configurations '[
    {
        "Classification": "container-executor",
        "Configurations": [
            {
                "Classification": "docker",
                "Properties": {
                    "docker.trusted.registries": "local,'${ACCOUNT_ID}'.dkr.ecr.'${REGION}'.amazonaws.com",
                    "docker.privileged-containers.registries": "local,'${ACCOUNT_ID}'.dkr.ecr.'${REGION}'.amazonaws.com"
                }
            }
        ]
    },
    {
        "Classification":"livy-conf",
        "Properties":{
            "livy.spark.master":"yarn",
            "livy.server.session.timeout":"16h",
            "livy.server.yarn.app-lookup-timeout": "600s",
            "livy.rsc.server.connect.timeout": "600s"
        }
    },
    {
        "Classification":"spark-defaults",
        "Properties":{
            "spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE":"docker",
            "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE":"docker",
            "spark.executor.instances":"2",
            "spark.pyspark.virtualenv.enabled": "false"
        }
    }
]'
#!/bin/sh
sudo sed -i '34 i "livy_session_startup_timeout_seconds": 600,' /mnt/notebook-env/bin/kernel_launcher.sh
#!/bin/sh
# First we ensure this is *not* the primary node
if [ `grep 'isMaster' /mnt/var/lib/info/instance.json | awk -F ':' '{print $2}' | awk -F ',' '{print $1}'` = 'true' ]; then
echo "This is the primary node, exiting."
exit 0
fi
# Then we create a script that can execute in the background.
# Otherwise, the bootstrap will wait too long and fail the cluster startup.
cat >/tmp/docker_wait.sh <<EOF
#!/bin/sh
# Wait for Docker to be installed
echo -n "Waiting for Docker daemon to start..."
while [ ! \$(pgrep dockerd) ]; do echo -n "."; sleep 1; done
echo -e "\nDocker is running, populating images"
# Define some variables used throughout
AWS_REGION=us-west-2
AWS_ACCOUNT_ID=568026268536
TARGET_REGISTRY=\${AWS_ACCOUNT_ID}.dkr.ecr.\${AWS_REGION}.amazonaws.com
# First we need to authenticate
aws ecr get-login-password --region \${AWS_REGION} | sudo docker login --username AWS --password-stdin \${TARGET_REGISTRY}
# Then we'll iterate through any arguments passed to this and do a docker pull
for image in $@
do
echo "Pulling Docker image: \$image";
sudo docker pull \$image
done
EOF
# Execute the script in the background
chmod +x /tmp/docker_wait.sh
nohup /tmp/docker_wait.sh &
#!/bin/sh
echo "Patching Livy for Python 3.8 support"
LIVY_JAR=/usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.0-incubating.jar
# Unzip the fake_shell.py file to a tmp location
cd /tmp
jar xf ${LIVY_JAR} fake_shell.py
# Now patch the file with the changes from https://github.com/apache/incubator-livy/pull/314
patch --ignore-whitespace << 'EOF'
From 81a4e92b5abbc65f27b35454a8f28a72b8f4179b Mon Sep 17 00:00:00 2001
From: Gabriel Magno <gabrielmagno1@gmail.com>
Date: Wed, 6 Jan 2021 16:07:18 -0300
Subject: [PATCH] Mock ast.Module to work with Python 3.8
---
repl/src/main/resources/fake_shell.py | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/repl/src/main/resources/fake_shell.py b/repl/src/main/resources/fake_shell.py
index 2a99185b1..b5b284d8f 100644
--- a/repl/src/main/resources/fake_shell.py
+++ b/repl/src/main/resources/fake_shell.py
@@ -40,6 +40,14 @@
import cStringIO
import StringIO
+if sys.version_info > (3,8):
+ from ast import Module
+else :
+ # mock the new API, ignore second argument
+ # see https://github.com/ipython/ipython/issues/11590
+ from ast import Module as OriginalModule
+ Module = lambda nodelist, type_ignores: OriginalModule(nodelist)
+
logging.basicConfig()
LOG = logging.getLogger('fake_shell')
@@ -219,7 +227,7 @@ def execute(self):
try:
for node in to_run_exec:
- mod = ast.Module([node])
+ mod = Module([node], [])
code = compile(mod, '<stdin>', 'exec')
exec(code, global_dict)
EOF
# And finally, update the Livy jar and restart Livy
sudo jar uvf ${LIVY_JAR} fake_shell.py
sudo systemctl restart livy-server
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment