minikube start --cpus=4 --memory=12g --nodes 4
kubectl create serviceaccount spark-sa
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark-sa --namespace=default
# location for fallback storage
mkdir -p /tmp/spark-data
# create location on all minikube nodes
nodes=$(minikube node list | cut -f 1)
for node in $nodes; do
minikube ssh -n "$node" -- sudo mkdir -p /tmp/spark-data
done
# mount on first minikube node
minikube mount --uid=185 --gid=185 /tmp/spark-data:/tmp/spark-data
# inspect mount details
minikube ssh -n minikube mount | grep spark-data
# this should show:
# 192.168.49.1 on /tmp/spark-data type 9p (rw,relatime,sync,dirsync,dfltuid=185,dfltgid=185,access=any,msize=262144,trans=tcp,noextend,port=36057)
# mount on all other minicude nodes
read ip on path type t options <<<$(minikube ssh -n minikube mount | grep spark-data)
nodes_but_first=$(minikube node list | cut -f 1 | tail +2)
for node in $nodes_but_first; do
minikube ssh -n "$node" -- sudo mount "$ip" "$path" -t "$t" -o "${options:1:-2}"
done
kubectl cluster-info
Outputs for example:
Kubernetes control plane is running at https://192.168.49.2:8443
CoreDNS is running at https://192.168.49.2:8443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
Use control plane URL as master
:
k8s_url="https://192.168.49.2:8443"
curl -L --remote-name "https://www.apache.org/dyn/closer.lua/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz?action=download"
tar -xzf spark-3.5.5-bin-hadoop3.tgz
The Scala code that we run:
read -r -d '' scala << EOF
sc.setLogLevel("INFO")
import org.apache.spark.sql.SaveMode
val n = 100000000
val j = spark.sparkContext.broadcast(1000)
val x = spark.range(0, n, 1, 100).select($"id".cast("int"))
x.as[Int]
.mapPartitions { it => if (it.hasNext && it.next < n / 100 * 80) Thread.sleep(2000); it }
.groupBy($"value" % 1000).as[Int, Int]
.flatMapSortedGroups($"value"){ case (m, it) => if (it.hasNext && it.next == 0) Thread.sleep(10000); it }
.write.mode(SaveMode.Overwrite).csv("/tmp/spark.csv")
Thread.sleep(60000)
EOF
Run the code via spark-shell
:
./spark-3.5.5-bin-hadoop3/bin/spark-shell \
--master k8s://$k8s_url \
--deploy-mode client \
--name dynamic-allocation-with-decommissioning-example \
--conf spark.default.parallelism=10 \
--conf spark.sql.shuffle.partitions=5 \
--conf spark.driver.host=192.168.179.125 \
--conf spark.kubernetes.container.image=apache/spark:3.5.5 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=16 \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.executorIdleTimeout=1 \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=1 \
--conf spark.dynamicAllocation.shuffleTracking.enabled=false \
--conf spark.kubernetes.allocation.batch.size=16 \
--conf spark.kubernetes.allocation.batch.delay=101ms \
--conf spark.decommission.enabled=true \
--conf spark.storage.decommission.enabled=true \
--conf spark.storage.decommission.shuffleBlocks.enabled=true \
--conf spark.storage.decommission.fallbackStorage.path=/var/data/shuffle/ \
--conf spark.storage.decommission.fallbackStorage.cleanUp=true \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
--conf spark.kubernetes.executor.volumes.hostPath.data.options.path=/tmp/spark-data \
--conf spark.kubernetes.executor.volumes.hostPath.data.options.type=DirectoryOrCreate \
--conf spark.kubernetes.executor.volumes.hostPath.data.mount.path=/var/data \
--conf spark.kubernetes.executor.volumes.hostPath.data.mount.readOnly=false \
--conf spark.kubernetes.executor.deleteOnTermination=false <<< "$scala"