Last active
November 18, 2022 23:30
-
-
Save nousr/cb9d85dff8f752a9c29e1d9804de86a1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# step 1: get environment variables | |
# step 2: setup rank 0 to be the master and start the indexing python file (the last two operations happen in parallel) | |
# step 3: setup rank 1-N to be the workers and start the worker script (they then will listen for work from the master) | |
# get environment variables | |
GLOBAL_RANK=$SLURM_PROCID | |
CPUS=$SLURM_CPUS_PER_TASK | |
MEM=$SLURM_MEM_PER_NODE # seems to be in MB | |
MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1) | |
LOCAL_IP=$(hostname -I | awk '{print $1}') | |
# set some environment variables for the indexing script | |
export MASTER_ADDR=$MASTER_ADDR | |
export MEMORY=$MEM | |
export CPUS=$CPUS | |
export SPARK_LOCAL_IP=$LOCAL_IP | |
# setup the master node | |
if [ $GLOBAL_RANK == 0 ] | |
then | |
# print out some info | |
echo -e "MASTER ADDR: $MASTER_ADDR\tGLOBAL RANK: $GLOBAL_RANK\tCPUS PER TASK: $CPUS\tMEM PER NODE: $MEM" | |
# then start the spark master node in the background | |
./spark-3.3.1-bin-hadoop3/sbin/start-master.sh -p 7077 -h $LOCAL_IP & | |
# then start the indexing python script | |
./autofaiss.pex indexing.py | |
fi | |
# setup the worker nodes | |
if [ $GLOBAL_RANK != 0 ] | |
then | |
# then start the spark worker node in the background | |
MEM_IN_GB=$(($MEM / 1000)) | |
# concat a "G" to the end of the memory string | |
MEM_IN_GB="$MEM_IN_GB"G | |
echo "MEM IN GB: $MEM_IN_GB" | |
./spark-3.3.1-bin-hadoop3/sbin/start-worker.sh -c $CPUS -m $MEM_IN_GB "spark://$MASTER_ADDR:7077" & | |
echo "Hello from worker $GLOBAL_RANK" | |
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from autofaiss import build_index | |
from pyspark.sql import SparkSession # pylint: disable=import-outside-toplevel | |
from pyspark import SparkConf, SparkContext | |
# set environment variables | |
os.environ['PYSPARK_PYTHON'] = "/fsx/nousr/indexing/autofaiss.pex" | |
# get spark session environment variables | |
MASTER_ADDR = str(os.environ['MASTER_ADDR']) | |
# get the machine configuration | |
TOTAL_MEMORY = int(int(os.environ['MEMORY']) / 1024) # in Mb convert to Gb | |
EXECUTOR_MEMORY_OVERHEAD = int(TOTAL_MEMORY * 0.1) # should scale 6-10% of executor memory | |
EXECUTOR_MEMORY = int(TOTAL_MEMORY - EXECUTOR_MEMORY_OVERHEAD) | |
CPUS = str(os.environ['CPUS']) | |
# print the vars to make sure they're correct | |
print("\n--------------------") | |
print(f"MASTER_ADDR: {MASTER_ADDR}") | |
print(f"TOTAL_MEMORY: {TOTAL_MEMORY}") | |
print(f"EXECUTOR_MEMORY_OVERHEAD: {EXECUTOR_MEMORY_OVERHEAD}") | |
print(f"EXECUTOR_MEMORY: {EXECUTOR_MEMORY}") | |
print(f"CORES: {CPUS}") | |
print("--------------------\n") | |
def create_spark_session(): | |
spark = ( | |
SparkSession.builder | |
.config("spark.submit.deployMode", "client") \ | |
.config("spark.executorEnv.PEX_ROOT", "./.pex") | |
.config("spark.task.cpus", f"{CPUS}") | |
.config("spark.driver.port", "5678") | |
.config("spark.driver.blockManager.port", "6678") | |
.config("spark.driver.host", f"{MASTER_ADDR}") | |
.config("spark.driver.bindAddress", f"{MASTER_ADDR}") | |
.config("spark.executor.memory", f"{EXECUTOR_MEMORY}G") # make sure to increase this if you're using more cores per executor | |
.config("spark.executor.memoryOverhead", f"{EXECUTOR_MEMORY_OVERHEAD}G") | |
.config("spark.task.maxFailures", "100") | |
.config("spark.port.maxRetries", "100") | |
.master(f"spark://{MASTER_ADDR}:7077") # this should point to your master node, if using the tunnelling version, keep this to localhost | |
.appName("spark-stats") | |
.getOrCreate() | |
) | |
return spark | |
spark = create_spark_session() | |
index, index_infos = build_index( | |
embeddings=["s3://s-laion/vit-h-14-embeddings/img_emb"], | |
distributed="pyspark", | |
file_format="npy", | |
max_index_memory_usage="500G", | |
current_memory_available="800G", | |
temporary_indices_folder="s3://s-laion/vit-h-14-indices/laion2b-en/tmp/", | |
index_path="s3://s-laion/vit-h-14-indices/laion2b-en/indices/", | |
index_infos_path="s3://s-laion/vit-h-14-indices/laion2b-en/infos.json" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#SBATCH --partition=gpu | |
#SBATCH --job-name=indexing | |
#SBATCH --nodes 3 | |
#SBATCH --ntasks-per-node 1 | |
#SBATCH --cpus-per-task=48 | |
#SBATCH --mem=0 # 0 means use all available memory (in MB) | |
#SBATCH --output=%x_%j.out | |
#SBATCH --comment dalle2 | |
#SBATCH --exclusive | |
# launch the interior script on each node | |
srun --comment dalle2 indexer.sh |
wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz && tar xf spark-3.3.1-bin-hadoop3.tgz
wget https://github.com/criteo/autofaiss/releases/latest/download/autofaiss-3.8.pex -O autofaiss.pex && chmod +x autofaiss.pex
changes to make:
- remove &
- use ip
https://gist.github.com/rom1504/67ada3dedbecc113ae2dbdfd9c642d83 minimal working script
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
this error is likely due to a mismatch of spark/jvm versions (ensure your pyspark version is also correct)