Skip to content

Instantly share code, notes, and snippets.

@rom1504
Last active August 7, 2023 02:04
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rom1504/67ada3dedbecc113ae2dbdfd9c642d83 to your computer and use it in GitHub Desktop.
Save rom1504/67ada3dedbecc113ae2dbdfd9c642d83 to your computer and use it in GitHub Desktop.
spark on slurm

Steps:

  1. wget https://gist.github.com/rom1504/67ada3dedbecc113ae2dbdfd9c642d83/raw/865fb35e00f21330b5b82aeb7c31941b6c18f649/spark_on_slurm.sh
  2. wget https://gist.github.com/rom1504/67ada3dedbecc113ae2dbdfd9c642d83/raw/865fb35e00f21330b5b82aeb7c31941b6c18f649/worker_spark_on_slurm.sh
  3. wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz && tar xf spark-3.3.1-bin-hadoop3.tgz
  4. sbatch spark_on_slurm.sh
  5. build a venv, install pyspark, then run something like this:

(you can get https://huggingface.co/datasets/laion/laion-coco/resolve/main/part-00000-2256f782-126f-4dc6-b9c6-e6757637749d-c000.snappy.parquet as an example parquet)

from pyspark.sql import SparkSession
from  pyspark.sql.functions import lit, rand

spark = (
        SparkSession.builder
        .config("spark.submit.deployMode", "client") \
        .config("spark.executor.memory", "16GB")
        .config("spark.executor.memoryOverhead", "8GB")
        .config("spark.task.maxFailures", "100")
        .master("spark://master_node:7077")
        .appName("spark-stats")
        .getOrCreate()
    )
    
df = spark.read.parquet("part-00000-2256f782-126f-4dc6-b9c6-e6757637749d-c000.snappy.parquet")

df.count()

replace master_node by the first node you got in slurm job

you may check the spark ui by doing something along these lines :

ssh -L 4040:localhost:4040 -L 8080:localhost:8080 login_node

then

ssh -L localhost:4040:master_node:4040 -L localhost:8080:master_node:8080 master_node

and check http://localhost:4040 and http://localhost:8080 in your browser

#!/bin/bash
#SBATCH --partition=gpu
#SBATCH --job-name=spark_on_slurm
#SBATCH --nodes 2
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task=48
#SBATCH --mem=0 # 0 means use all available memory (in MB)
#SBATCH --output=%x_%j.out
#SBATCH --comment bild
#SBATCH --exclusive
srun --comment bild bash worker_spark_on_slurm.sh
#!/bin/bash
#
# get environment variables
GLOBAL_RANK=$SLURM_PROCID
CPUS=`grep -c ^processor /proc/cpuinfo`
MEM=$((`grep MemTotal /proc/meminfo | awk '{print $2}'`/1000)) # seems to be in MB
MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)
LOCAL_IP=$(hostname -I | awk '{print $1}')
LOCALDIR=/scratch/spark
export SPARK_WORKER_DIR=$LOCALDIR/work
export SPARK_LOCAL_DIRS=$LOCALDIR/local
# setup the master node
if [ $GLOBAL_RANK == 0 ]
then
# print out some info
echo -e "MASTER ADDR: $MASTER_ADDR\tGLOBAL RANK: $GLOBAL_RANK\tCPUS PER TASK: $CPUS\tMEM PER NODE: $MEM"
# then start the spark master node in the background
./spark-3.3.1-bin-hadoop3/sbin/start-master.sh -p 7077 -h $LOCAL_IP
fi
sleep 10
# then start the spark worker node in the background
MEM_IN_GB=$(($MEM / 1000))
# concat a "G" to the end of the memory string
MEM_IN_GB="$MEM_IN_GB"G
echo "MEM IN GB: $MEM_IN_GB"
./spark-3.3.1-bin-hadoop3/sbin/start-worker.sh -c $CPUS -m $MEM_IN_GB "spark://$MASTER_ADDR:7077"
echo "Hello from worker $GLOBAL_RANK"
sleep 10
if [ $GLOBAL_RANK == 0 ]
then
# then start some script
echo "hi"
fi
sleep 1000000
@salanki
Copy link

salanki commented Nov 21, 2022

Does this really start more than 1 worker, since the worker command isn’t launched by an srun?

@rom1504
Copy link
Author

rom1504 commented Nov 21, 2022

Oh yeah you're right. Will fix

@rom1504
Copy link
Author

rom1504 commented Nov 21, 2022

now fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment