Skip to content

Instantly share code, notes, and snippets.

@nousr
Last active November 18, 2022 23:30
Show Gist options
  • Save nousr/cb9d85dff8f752a9c29e1d9804de86a1 to your computer and use it in GitHub Desktop.
Save nousr/cb9d85dff8f752a9c29e1d9804de86a1 to your computer and use it in GitHub Desktop.
#!/bin/bash
# step 1: get environment variables
# step 2: setup rank 0 to be the master and start the indexing python file (the last two operations happen in parallel)
# step 3: setup rank 1-N to be the workers and start the worker script (they then will listen for work from the master)
# get environment variables
GLOBAL_RANK=$SLURM_PROCID
CPUS=$SLURM_CPUS_PER_TASK
MEM=$SLURM_MEM_PER_NODE # seems to be in MB
MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)
LOCAL_IP=$(hostname -I | awk '{print $1}')
# set some environment variables for the indexing script
export MASTER_ADDR=$MASTER_ADDR
export MEMORY=$MEM
export CPUS=$CPUS
export SPARK_LOCAL_IP=$LOCAL_IP
# setup the master node
if [ $GLOBAL_RANK == 0 ]
then
# print out some info
echo -e "MASTER ADDR: $MASTER_ADDR\tGLOBAL RANK: $GLOBAL_RANK\tCPUS PER TASK: $CPUS\tMEM PER NODE: $MEM"
# then start the spark master node in the background
./spark-3.3.1-bin-hadoop3/sbin/start-master.sh -p 7077 -h $LOCAL_IP &
# then start the indexing python script
./autofaiss.pex indexing.py
fi
# setup the worker nodes
if [ $GLOBAL_RANK != 0 ]
then
# then start the spark worker node in the background
MEM_IN_GB=$(($MEM / 1000))
# concat a "G" to the end of the memory string
MEM_IN_GB="$MEM_IN_GB"G
echo "MEM IN GB: $MEM_IN_GB"
./spark-3.3.1-bin-hadoop3/sbin/start-worker.sh -c $CPUS -m $MEM_IN_GB "spark://$MASTER_ADDR:7077" &
echo "Hello from worker $GLOBAL_RANK"
fi
import os
from autofaiss import build_index
from pyspark.sql import SparkSession # pylint: disable=import-outside-toplevel
from pyspark import SparkConf, SparkContext
# set environment variables
os.environ['PYSPARK_PYTHON'] = "/fsx/nousr/indexing/autofaiss.pex"
# get spark session environment variables
MASTER_ADDR = str(os.environ['MASTER_ADDR'])
# get the machine configuration
TOTAL_MEMORY = int(int(os.environ['MEMORY']) / 1024) # in Mb convert to Gb
EXECUTOR_MEMORY_OVERHEAD = int(TOTAL_MEMORY * 0.1) # should scale 6-10% of executor memory
EXECUTOR_MEMORY = int(TOTAL_MEMORY - EXECUTOR_MEMORY_OVERHEAD)
CPUS = str(os.environ['CPUS'])
# print the vars to make sure they're correct
print("\n--------------------")
print(f"MASTER_ADDR: {MASTER_ADDR}")
print(f"TOTAL_MEMORY: {TOTAL_MEMORY}")
print(f"EXECUTOR_MEMORY_OVERHEAD: {EXECUTOR_MEMORY_OVERHEAD}")
print(f"EXECUTOR_MEMORY: {EXECUTOR_MEMORY}")
print(f"CORES: {CPUS}")
print("--------------------\n")
def create_spark_session():
spark = (
SparkSession.builder
.config("spark.submit.deployMode", "client") \
.config("spark.executorEnv.PEX_ROOT", "./.pex")
.config("spark.task.cpus", f"{CPUS}")
.config("spark.driver.port", "5678")
.config("spark.driver.blockManager.port", "6678")
.config("spark.driver.host", f"{MASTER_ADDR}")
.config("spark.driver.bindAddress", f"{MASTER_ADDR}")
.config("spark.executor.memory", f"{EXECUTOR_MEMORY}G") # make sure to increase this if you're using more cores per executor
.config("spark.executor.memoryOverhead", f"{EXECUTOR_MEMORY_OVERHEAD}G")
.config("spark.task.maxFailures", "100")
.config("spark.port.maxRetries", "100")
.master(f"spark://{MASTER_ADDR}:7077") # this should point to your master node, if using the tunnelling version, keep this to localhost
.appName("spark-stats")
.getOrCreate()
)
return spark
spark = create_spark_session()
index, index_infos = build_index(
embeddings=["s3://s-laion/vit-h-14-embeddings/img_emb"],
distributed="pyspark",
file_format="npy",
max_index_memory_usage="500G",
current_memory_available="800G",
temporary_indices_folder="s3://s-laion/vit-h-14-indices/laion2b-en/tmp/",
index_path="s3://s-laion/vit-h-14-indices/laion2b-en/indices/",
index_infos_path="s3://s-laion/vit-h-14-indices/laion2b-en/infos.json"
)
#!/bin/bash
#SBATCH --partition=gpu
#SBATCH --job-name=indexing
#SBATCH --nodes 3
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task=48
#SBATCH --mem=0 # 0 means use all available memory (in MB)
#SBATCH --output=%x_%j.out
#SBATCH --comment dalle2
#SBATCH --exclusive
# launch the interior script on each node
srun --comment dalle2 indexer.sh
@rom1504
Copy link

rom1504 commented Nov 18, 2022

changes to make:

  • remove &
  • use ip

@rom1504
Copy link

rom1504 commented Nov 18, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment