Skip to content

Instantly share code, notes, and snippets.

@EnricoMi
Last active June 12, 2025 05:45
Show Gist options
  • Save EnricoMi/e9daa1176bce4c1211af3f3c5848112a to your computer and use it in GitHub Desktop.
Save EnricoMi/e9daa1176bce4c1211af3f3c5848112a to your computer and use it in GitHub Desktop.
Running Spark with fallback storage decommissioning on K8S

Running Spark with fallback storage decommissioning on K8S

Create local test K8S cluster (minikube)

minikube start --cpus=4 --memory=12g --nodes 4
kubectl create serviceaccount spark-sa
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark-sa --namespace=default

# location for fallback storage
mkdir -p /tmp/spark-data

# create location on all minikube nodes
nodes=$(minikube node list | cut -f 1)
for node in $nodes; do
  minikube ssh -n "$node" -- sudo mkdir -p /tmp/spark-data
done

# mount on first minikube node
minikube mount --uid=185 --gid=185 /tmp/spark-data:/tmp/spark-data

# inspect mount details
minikube ssh -n minikube mount | grep spark-data
# this should show:
# 192.168.49.1 on /tmp/spark-data type 9p (rw,relatime,sync,dirsync,dfltuid=185,dfltgid=185,access=any,msize=262144,trans=tcp,noextend,port=36057)

# mount on all other minicude nodes
read ip on path type t options <<<$(minikube ssh -n minikube mount | grep spark-data)
nodes_but_first=$(minikube node list | cut -f 1 | tail +2)
for node in $nodes_but_first; do
  minikube ssh -n "$node" -- sudo mount "$ip" "$path" -t "$t" -o "${options:1:-2}"
done

Get minikube cluster URL

kubectl cluster-info

Outputs for example:

Kubernetes control plane is running at https://192.168.49.2:8443
CoreDNS is running at https://192.168.49.2:8443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

Use control plane URL as master:

k8s_url="https://192.168.49.2:8443"

Download Spark

curl -L --remote-name "https://www.apache.org/dyn/closer.lua/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz?action=download"
tar -xzf spark-3.5.5-bin-hadoop3.tgz

Run some example code

The Scala code that we run:

read -r -d '' scala << EOF
sc.setLogLevel("INFO")
import org.apache.spark.sql.SaveMode

val n = 100000000
val j = spark.sparkContext.broadcast(1000)
val x = spark.range(0, n, 1, 100).select($"id".cast("int"))
x.as[Int]
 .mapPartitions { it => if (it.hasNext && it.next < n / 100 * 80) Thread.sleep(2000); it }
 .groupBy($"value" % 1000).as[Int, Int]
 .flatMapSortedGroups($"value"){ case (m, it) => if (it.hasNext && it.next == 0) Thread.sleep(10000); it }
 .write.mode(SaveMode.Overwrite).csv("/tmp/spark.csv")
Thread.sleep(60000)
EOF

Run the code via spark-shell:

./spark-3.5.5-bin-hadoop3/bin/spark-shell \
    --master k8s://$k8s_url \
    --deploy-mode client \
    --name dynamic-allocation-with-decommissioning-example \
    --conf spark.default.parallelism=10 \
    --conf spark.sql.shuffle.partitions=5 \
    --conf spark.driver.host=192.168.179.125 \
    --conf spark.kubernetes.container.image=apache/spark:3.5.5 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=16 \
    --conf spark.dynamicAllocation.initialExecutors=1 \
    --conf spark.dynamicAllocation.executorIdleTimeout=1 \
    --conf spark.dynamicAllocation.schedulerBacklogTimeout=1 \
    --conf spark.dynamicAllocation.shuffleTracking.enabled=false \
    --conf spark.kubernetes.allocation.batch.size=16 \
    --conf spark.kubernetes.allocation.batch.delay=101ms \
    --conf spark.decommission.enabled=true \
    --conf spark.storage.decommission.enabled=true \
    --conf spark.storage.decommission.shuffleBlocks.enabled=true \
    --conf spark.storage.decommission.fallbackStorage.path=/var/data/shuffle/ \
    --conf spark.storage.decommission.fallbackStorage.cleanUp=true \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
    --conf spark.kubernetes.executor.volumes.hostPath.data.options.path=/tmp/spark-data \
    --conf spark.kubernetes.executor.volumes.hostPath.data.options.type=DirectoryOrCreate \
    --conf spark.kubernetes.executor.volumes.hostPath.data.mount.path=/var/data \
    --conf spark.kubernetes.executor.volumes.hostPath.data.mount.readOnly=false \
    --conf spark.kubernetes.executor.deleteOnTermination=false <<< "$scala"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment