Skip to content

Instantly share code, notes, and snippets.

@minyk
Last active May 21, 2020 07:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minyk/2db3040347f444a4f8298e8d5357a79a to your computer and use it in GitHub Desktop.
Save minyk/2db3040347f444a4f8298e8d5357a79a to your computer and use it in GitHub Desktop.
Learning how to properly run Apache spark on the Kunernetes environments

Using spark.kubernetes.driver.volumes... configurations:

spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp1.mount.path=/tmp/data1
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp1.mount.readOnly=false
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp1.options.path=/data1
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp2.mount.path=/tmp/data2
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp2.mount.readOnly=false
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp2.options.path=/data2

KubernetesVolumeUtils.scala:

  def parseVolumesWithPrefix(sparkConf: SparkConf, prefix: String): Seq[KubernetesVolumeSpec] = {
    val properties = sparkConf.getAllWithPrefix(prefix).toMap

    getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
      val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
      val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
      val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"

      KubernetesVolumeSpec(
        volumeName = volumeName,
        mountPath = properties(pathKey),
        mountSubPath = properties.get(subPathKey).getOrElse(""),
        mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
        volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName))
    }.toSeq
  }
  
  private def parseVolumeSpecificConf(
      options: Map[String, String],
      volumeType: String,
      volumeName: String): KubernetesVolumeSpecificConf = {
    volumeType match {
      case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
        val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
        KubernetesHostPathVolumeConf(options(pathKey))

      case KUBERNETES_VOLUMES_PVC_TYPE =>
        val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
        KubernetesPVCVolumeConf(options(claimNameKey))

      case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
        val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
        val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
        KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey))

      case _ =>
        throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported")
    }
  }

k8s/Config.scala:

  val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
  val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
  val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
  val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
  val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
  val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
  val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
  val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
  val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
  val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
    - name: spark-local-dir-tmp
      persistentVolumeClaim:
        claimName: shuffle-pvc-claim
      volumeMounts:
        - name: spark-local-dir-tmp
          mountPath: /shuffletmp
      env:
        - name: SPARK_LOCAL_DIRS
          value: /shuffletmp

Pod is pending, Shoud I make the PVC ?

  conditions:
    - type: PodScheduled
      status: 'False'
      lastProbeTime: null
      lastTransitionTime: '2020-05-21T07:24:20Z'
      reason: Unschedulable
      message: persistentvolumeclaim "shuffle-pvc-claim" not found

TBD

Mount tmpfs volumes into the SPARK_LOCAL_DIRS. Set spark.kubernetes.local.dirs.tmpfs to true and executor pod(template) should not have volumes start with spark-local-dir- explicitly.

k8s/Config.scala:

  val KUBERNETES_LOCAL_DIRS_TMPFS =
    ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
      .doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " +
        "their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " +
        "volumes. This may improve performance but scratch space usage will count towards " +
        "your pods memory limit so you may wish to request more memory.")
      .version("3.0.0")
      .booleanConf
      .createWithDefault(false)

LocalDirsFeatureStep.scala:

  private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)

  override def configurePod(pod: SparkPod): SparkPod = {
    var localDirs = pod.container.getVolumeMounts.asScala
      .filter(_.getName.startsWith("spark-local-dir-"))
      .map(_.getMountPath)
    var localDirVolumes : Seq[Volume] = Seq()
    var localDirVolumeMounts : Seq[VolumeMount] = Seq()

    if (localDirs.isEmpty) {
      // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
      // property - we want to instead default to mounting an emptydir volume that doesn't already
      // exist in the image.
      // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
      // a bit opinionated about YARN and Mesos.
      val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
        .orElse(conf.getOption("spark.local.dir"))
        .getOrElse(defaultLocalDir)
        .split(",")
      localDirs = resolvedLocalDirs.toBuffer
      localDirVolumes = resolvedLocalDirs
        .zipWithIndex
        .map { case (_, index) =>
          new VolumeBuilder()
            .withName(s"spark-local-dir-${index + 1}")
            .withNewEmptyDir()
              .withMedium(if (useLocalDirTmpFs) "Memory" else null)
            .endEmptyDir()
            .build()
        }

      localDirVolumeMounts = localDirVolumes
        .zip(resolvedLocalDirs)
        .map { case (localDirVolume, localDirPath) =>
          new VolumeMountBuilder()
            .withName(localDirVolume.getName)
            .withMountPath(localDirPath)
            .build()
          }
    }

    val podWithLocalDirVolumes = new PodBuilder(pod.pod)
      .editSpec()
        .addToVolumes(localDirVolumes: _*)
        .endSpec()
      .build()
    val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
      .addNewEnv()
        .withName("SPARK_LOCAL_DIRS")
        .withValue(localDirs.mkString(","))
        .endEnv()
      .addToVolumeMounts(localDirVolumeMounts: _*)
      .build()
    SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
  }

From https://docs.actian.com/vector/5.1/index.html#page/User/How_to_Securely_Manage_S3_Credentials.htm

  • Make jceks file:
$ hadoop credential create fs.s3a.access.key -value MY_ACCESS_KEY -provider jceks://file/opt/s3.jceks
$ hadoop credential create fs.s3a.secret.key -value MY_SECRET_KEY -provider jceks://file/opt/s3.jceks
  • Using in the core-site.xml:
<property>
  <name>fs.s3a.security.credential.provider.path</name>
  <value>jceks://file/opt/s3.jceks<value/>
</property>
  • Mount file into pod: TBD

See also: https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html

Cores

  • executor request cores: spark.kubernetes.executor.request.cores
  • executor limit cores: spark.kubernetes.executor.limit.cores

From https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits,

  • request: a container to use more resource than its request
  • limit: a container is not allowed to use more than its resource limit

BasicExecutorFeatureStep.scala:

  private val executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
  private val executorCoresRequest =
    if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
      kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
    } else {
      executorCores.toString
    }
  private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)

Memory

if `python`
  `spark.executor.memory` + `spark.executor.memoryOverhead` + `spark.executor.pyspark.memory` or
  `spark.executor.memory` + max(`spark.executor.memory` * `spark.kubernetes.memoryOverheadFactor`, 384) + `spark.executor.pyspark.memory`
else
  `spark.executor.memory` + `spark.executor.memoryOverhead` or
  `spark.executor.memory` + max(`spark.executor.memory` * `spark.kubernetes.memoryOverheadFactor`, 384)

BasicExecutorFeatureStep.scala:

  private val memoryOverheadMiB = kubernetesConf
    .get(EXECUTOR_MEMORY_OVERHEAD)
    .getOrElse(math.max(
      (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
      MEMORY_OVERHEAD_MIN_MIB))
  private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
  private val executorMemoryTotal =
    if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) {
      executorMemoryWithOverhead +
        kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
    } else {
      executorMemoryWithOverhead
    }

config/package.scala:

  private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
    .doc("The amount of non-heap memory to be allocated per executor in cluster mode, " +
      "in MiB unless otherwise specified.")
    .version("2.3.0")
    .bytesConf(ByteUnit.MiB)
    .createOptional

k8s/Config.scala:

  val MEMORY_OVERHEAD_FACTOR =
    ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
      .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
        "which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
      .version("2.4.0")
      .doubleConf
      .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
        "Ensure that memory overhead is a double between 0 --> 1.0")
      .createWithDefault(0.1)

Constants.scala:

  val MEMORY_OVERHEAD_MIN_MIB = 384L

The number of executors is came from the internal configruration spark.executor.instances. So, spark.cores.max does nothing on the K8S env.

KubernetesClusterSchedulerBackend.scala:

  private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)

SchedulerBackendUtils.scala

  def getInitialTargetExecutorNumber(
      conf: SparkConf,
      numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
    if (Utils.isDynamicAllocationEnabled(conf)) {
      val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
      val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
      val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
      require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
        s"initial executor number $initialNumExecutors must between min executor number " +
          s"$minNumExecutors and max executor number $maxNumExecutors")

      initialNumExecutors
    } else {
      conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
    }
  }

config/package.scala:

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
  .version("1.0.0")
  .intConf
  .createOptional

config

k8s/Config.scala:

  val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE =
    ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
      .doc("File containing a template pod spec for executors")
      .version("3.0.0")
      .stringConf
      .createOptional

rendering

KubernetesClusterManager.scala:

    if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
      KubernetesUtils.loadPodFromTemplate(
        kubernetesClient,
        new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get),
        sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
    }

KubernetesUtils.scala:

  def loadPodFromTemplate(
      kubernetesClient: KubernetesClient,
      templateFile: File,
      containerName: Option[String]): SparkPod = {
    try {
      val pod = kubernetesClient.pods().load(templateFile).get()
      selectSparkContainer(pod, containerName)
    } catch {
      case e: Exception =>
        logError(
          s"Encountered exception while attempting to load initial pod spec from file", e)
        throw new SparkException("Could not load pod from template file.", e)
    }
  }

executor-template.yaml:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
apiVersion: v1
Kind: Pod
metadata:
  labels:
    template-label-key: executor-template-label-value
spec:
  containers:
  - name: test-executor-container
    image: will-be-overwritten
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment