Skip to content

Instantly share code, notes, and snippets.

View dimastatz's full-sized avatar
😆
Working from home

Dima Statz dimastatz

😆
Working from home
View GitHub Profile
LegionMetrics.metrics.totalEvents.inc(batch.count())
LegionMetrics.metrics.runTime.update(System.currentTimeMillis - start)
package org.apache.spark.metrics.source
import com.codahale.metrics._
object LegionMetrics {
val metrics = new LegionMetrics
}
class LegionMetrics extends Source {
override val sourceName: String = "LegionCommonSource"
# Example configuration for PrometheusServlet
# Master metrics - http://localhost:8080/metrics/master/prometheus/
# Worker metrics - http://localhost:8081/metrics/prometheus/
# Driver metrics - http://localhost:4040/metrics/prometheus/
# Executors metrics - http://localhost:4040/metrics/executors/prometheus
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus
val df = spark
.read
.option("delimiter", " ")
.option("comment", "#")
.csv(compressedFilesPath)
.repartition(numPartitions=numOfPartitions)
.persist(StorageLevel.MEMORY_ONLY)
// read & unzip
df.count()
object SparkListenerClient {
def start(sc: SparkContext, enabled: Boolean): Option[SparkListenerClient] = {
if (enabled) {
val sl = new SparkListenerClient()
sc.addSparkListener(sl)
Some(sl)
} else None
}
def stop(sc: SparkContext, sl: SparkListenerClient): Unit = {
class SparkListenerClient extends SparkListener {
import System.currentTimeMillis
case class State(id: String, start: Long, duration: Long, cores: Int, closed: Boolean)
val executors: mutable.Map[String, State] = mutable.Map[String, State]()
override def onExecutorAdded(ex: SparkListenerExecutorAdded): Unit =
this.synchronized {
executors(ex.executorId) =
FROM openjdk:8 AS build
# Env variables
ENV SCALA_VERSION 2.11.12
ENV SBT_VERSION 1.2.8
# Install Scala
## Piping curl directly in tar
RUN \
curl -fsL https://downloads.typesafe.com/scala/$SCALA_VERSION/scala-$SCALA_VERSION.tgz | tar xfz - -C /root/ && \
def run_spark_app(config_name, dag):
eks_namespace = config.dim_value(config_name, 'eks_namespace')
eks_in_cluster = bool(config.dim_value(config_name, 'eks_in_cluster'))
eks_service_account_name = config.dim_value(config_name, 'eks_service_account_name')
eks_context = None if eks_in_cluster else config.dim_value(config_name, 'eks_context')
eks_commands = format_spark_submit(config_name, config.dim_value(config_name, 'eks_commands'))
return KubernetesPodOperator(
dag=dag,
cmds=eks_commands,
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: spark-role
roleRef:
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: spark-cluster
region: us-east-1
version: "1.14"
availabilityZones: ["us-east-1a", "us-east-1b","us-east-1c"]