Skip to content

Instantly share code, notes, and snippets.

@nousr
Last active November 18, 2022 23:30
Show Gist options
  • Save nousr/cb9d85dff8f752a9c29e1d9804de86a1 to your computer and use it in GitHub Desktop.
Save nousr/cb9d85dff8f752a9c29e1d9804de86a1 to your computer and use it in GitHub Desktop.
#!/bin/bash
# step 1: get environment variables
# step 2: setup rank 0 to be the master and start the indexing python file (the last two operations happen in parallel)
# step 3: setup rank 1-N to be the workers and start the worker script (they then will listen for work from the master)
# get environment variables
GLOBAL_RANK=$SLURM_PROCID
CPUS=$SLURM_CPUS_PER_TASK
MEM=$SLURM_MEM_PER_NODE # seems to be in MB
MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)
LOCAL_IP=$(hostname -I | awk '{print $1}')
# set some environment variables for the indexing script
export MASTER_ADDR=$MASTER_ADDR
export MEMORY=$MEM
export CPUS=$CPUS
export SPARK_LOCAL_IP=$LOCAL_IP
# setup the master node
if [ $GLOBAL_RANK == 0 ]
then
# print out some info
echo -e "MASTER ADDR: $MASTER_ADDR\tGLOBAL RANK: $GLOBAL_RANK\tCPUS PER TASK: $CPUS\tMEM PER NODE: $MEM"
# then start the spark master node in the background
./spark-3.3.1-bin-hadoop3/sbin/start-master.sh -p 7077 -h $LOCAL_IP &
# then start the indexing python script
./autofaiss.pex indexing.py
fi
# setup the worker nodes
if [ $GLOBAL_RANK != 0 ]
then
# then start the spark worker node in the background
MEM_IN_GB=$(($MEM / 1000))
# concat a "G" to the end of the memory string
MEM_IN_GB="$MEM_IN_GB"G
echo "MEM IN GB: $MEM_IN_GB"
./spark-3.3.1-bin-hadoop3/sbin/start-worker.sh -c $CPUS -m $MEM_IN_GB "spark://$MASTER_ADDR:7077" &
echo "Hello from worker $GLOBAL_RANK"
fi
import os
from autofaiss import build_index
from pyspark.sql import SparkSession # pylint: disable=import-outside-toplevel
from pyspark import SparkConf, SparkContext
# set environment variables
os.environ['PYSPARK_PYTHON'] = "/fsx/nousr/indexing/autofaiss.pex"
# get spark session environment variables
MASTER_ADDR = str(os.environ['MASTER_ADDR'])
# get the machine configuration
TOTAL_MEMORY = int(int(os.environ['MEMORY']) / 1024) # in Mb convert to Gb
EXECUTOR_MEMORY_OVERHEAD = int(TOTAL_MEMORY * 0.1) # should scale 6-10% of executor memory
EXECUTOR_MEMORY = int(TOTAL_MEMORY - EXECUTOR_MEMORY_OVERHEAD)
CPUS = str(os.environ['CPUS'])
# print the vars to make sure they're correct
print("\n--------------------")
print(f"MASTER_ADDR: {MASTER_ADDR}")
print(f"TOTAL_MEMORY: {TOTAL_MEMORY}")
print(f"EXECUTOR_MEMORY_OVERHEAD: {EXECUTOR_MEMORY_OVERHEAD}")
print(f"EXECUTOR_MEMORY: {EXECUTOR_MEMORY}")
print(f"CORES: {CPUS}")
print("--------------------\n")
def create_spark_session():
spark = (
SparkSession.builder
.config("spark.submit.deployMode", "client") \
.config("spark.executorEnv.PEX_ROOT", "./.pex")
.config("spark.task.cpus", f"{CPUS}")
.config("spark.driver.port", "5678")
.config("spark.driver.blockManager.port", "6678")
.config("spark.driver.host", f"{MASTER_ADDR}")
.config("spark.driver.bindAddress", f"{MASTER_ADDR}")
.config("spark.executor.memory", f"{EXECUTOR_MEMORY}G") # make sure to increase this if you're using more cores per executor
.config("spark.executor.memoryOverhead", f"{EXECUTOR_MEMORY_OVERHEAD}G")
.config("spark.task.maxFailures", "100")
.config("spark.port.maxRetries", "100")
.master(f"spark://{MASTER_ADDR}:7077") # this should point to your master node, if using the tunnelling version, keep this to localhost
.appName("spark-stats")
.getOrCreate()
)
return spark
spark = create_spark_session()
index, index_infos = build_index(
embeddings=["s3://s-laion/vit-h-14-embeddings/img_emb"],
distributed="pyspark",
file_format="npy",
max_index_memory_usage="500G",
current_memory_available="800G",
temporary_indices_folder="s3://s-laion/vit-h-14-indices/laion2b-en/tmp/",
index_path="s3://s-laion/vit-h-14-indices/laion2b-en/indices/",
index_infos_path="s3://s-laion/vit-h-14-indices/laion2b-en/infos.json"
)
#!/bin/bash
#SBATCH --partition=gpu
#SBATCH --job-name=indexing
#SBATCH --nodes 3
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task=48
#SBATCH --mem=0 # 0 means use all available memory (in MB)
#SBATCH --output=%x_%j.out
#SBATCH --comment dalle2
#SBATCH --exclusive
# launch the interior script on each node
srun --comment dalle2 indexer.sh
@nousr
Copy link
Author

nousr commented Nov 17, 2022

logs from a worker

Spark Command: /usr/lib/jvm/java-17-amazon-corretto.x86_64/bin/java -cp /fsx/nousr/indexing/spark-3.2.1-bin-hadoop3.2/conf/:/fsx/nousr/indexing/spark-3.2.1-bin-hadoop3.2/jars/* -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 -c 48 -m 1120G spark://gpu-st-p4d-24xlarge-16:7077
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/11/17 00:07:48 INFO Worker: Started daemon with process name: 25836@gpu-st-p4d-24xlarge-17
22/11/17 00:07:48 INFO SignalUtils: Registering signal handler for TERM
22/11/17 00:07:48 INFO SignalUtils: Registering signal handler for HUP
22/11/17 00:07:48 INFO SignalUtils: Registering signal handler for INT
22/11/17 00:07:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/17 00:07:48 INFO SecurityManager: Changing view acls to: xxxx
22/11/17 00:07:48 INFO SecurityManager: Changing modify acls to: xxxx
22/11/17 00:07:48 INFO SecurityManager: Changing view acls groups to: 
22/11/17 00:07:48 INFO SecurityManager: Changing modify acls groups to: 
22/11/17 00:07:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(xxxx); groups with view permissions: Set(); users  with modify permissions: Set(xxxx); groups with modify permissions: Set()
22/11/17 00:07:48 INFO Utils: Successfully started service 'sparkWorker' on port 42961.
22/11/17 00:07:48 INFO Worker: Worker decommissioning not enabled.
22/11/17 00:07:49 INFO Worker: Starting Spark worker 172.31.235.211:42961 with 48 cores, 1120.0 GiB RAM
22/11/17 00:07:49 INFO Worker: Running Spark version 3.2.1
22/11/17 00:07:49 INFO Worker: Spark home: /fsx/nousr/indexing/spark-3.2.1-bin-hadoop3.2
22/11/17 00:07:49 INFO ResourceUtils: ==============================================================
22/11/17 00:07:49 INFO ResourceUtils: No custom resources configured for spark.worker.
22/11/17 00:07:49 INFO ResourceUtils: ==============================================================
22/11/17 00:07:49 INFO Utils: Successfully started service 'WorkerUI' on port 8081.
22/11/17 00:07:49 INFO WorkerWebUI: Bound WorkerWebUI to 0.0.0.0, and started at http://gpu-st-p4d-24xlarge-17.hpc-1click-sandbox.pcluster:8081
22/11/17 00:07:49 INFO Worker: Connecting to master gpu-st-p4d-24xlarge-16:7077...
22/11/17 00:07:49 INFO TransportClientFactory: Successfully created connection to gpu-st-p4d-24xlarge-16/172.31.225.214:7077 after 19 ms (0 ms spent in bootstraps)
22/11/17 00:07:49 INFO Worker: Successfully registered with master spark://gpu-st-p4d-24xlarge-16.hpc-1click-sandbox.pcluster:7077
22/11/17 00:08:16 INFO Worker: gpu-st-p4d-24xlarge-16:7077 Disassociated !
22/11/17 00:08:16 ERROR Worker: Connection to master failed! Waiting for master to reconnect...
22/11/17 00:08:16 INFO Worker: gpu-st-p4d-24xlarge-16.hpc-1click-sandbox.pcluster:7077 Disassociated !
22/11/17 00:08:16 INFO Worker: Connecting to master gpu-st-p4d-24xlarge-16:7077...
22/11/17 00:08:16 ERROR Worker: Connection to master failed! Waiting for master to reconnect...
22/11/17 00:08:16 INFO Worker: Not spawning another attempt to register with the master, since there is an attempt scheduled already.
22/11/17 00:08:16 INFO TransportClientFactory: Found inactive connection to gpu-st-p4d-24xlarge-16/172.31.225.214:7077, creating a new one.
22/11/17 00:08:16 WARN Worker: Failed to connect to master gpu-st-p4d-24xlarge-16:7077
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:109)
	at org.apache.spark.deploy.worker.Worker$$anon$1.run(Worker.scala:311)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Failed to connect to gpu-st-p4d-24xlarge-16/172.31.225.214:7077
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
	... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: gpu-st-p4d-24xlarge-16/172.31.225.214:7077
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

@nousr
Copy link
Author

nousr commented Nov 17, 2022

above error was resolved after specifying the host ip when launching the worker

@nousr
Copy link
Author

nousr commented Nov 17, 2022

raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ (in unnamed module @0x3bb50eaa) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module @0x3bb50eaa

this error is likely due to a mismatch of spark/jvm versions (ensure your pyspark version is also correct)

@rom1504
Copy link

rom1504 commented Nov 18, 2022

wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz && tar xf spark-3.3.1-bin-hadoop3.tgz

wget https://github.com/criteo/autofaiss/releases/latest/download/autofaiss-3.8.pex -O autofaiss.pex && chmod +x autofaiss.pex

@rom1504
Copy link

rom1504 commented Nov 18, 2022

changes to make:

  • remove &
  • use ip

@rom1504
Copy link

rom1504 commented Nov 18, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment