This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
LegionMetrics.metrics.totalEvents.inc(batch.count()) | |
LegionMetrics.metrics.runTime.update(System.currentTimeMillis - start) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.metrics.source | |
import com.codahale.metrics._ | |
object LegionMetrics { | |
val metrics = new LegionMetrics | |
} | |
class LegionMetrics extends Source { | |
override val sourceName: String = "LegionCommonSource" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example configuration for PrometheusServlet | |
# Master metrics - http://localhost:8080/metrics/master/prometheus/ | |
# Worker metrics - http://localhost:8081/metrics/prometheus/ | |
# Driver metrics - http://localhost:4040/metrics/prometheus/ | |
# Executors metrics - http://localhost:4040/metrics/executors/prometheus | |
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet | |
*.sink.prometheusServlet.path=/metrics/prometheus | |
master.sink.prometheusServlet.path=/metrics/master/prometheus | |
applications.sink.prometheusServlet.path=/metrics/applications/prometheus |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val df = spark | |
.read | |
.option("delimiter", " ") | |
.option("comment", "#") | |
.csv(compressedFilesPath) | |
.repartition(numPartitions=numOfPartitions) | |
.persist(StorageLevel.MEMORY_ONLY) | |
// read & unzip | |
df.count() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object SparkListenerClient { | |
def start(sc: SparkContext, enabled: Boolean): Option[SparkListenerClient] = { | |
if (enabled) { | |
val sl = new SparkListenerClient() | |
sc.addSparkListener(sl) | |
Some(sl) | |
} else None | |
} | |
def stop(sc: SparkContext, sl: SparkListenerClient): Unit = { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SparkListenerClient extends SparkListener { | |
import System.currentTimeMillis | |
case class State(id: String, start: Long, duration: Long, cores: Int, closed: Boolean) | |
val executors: mutable.Map[String, State] = mutable.Map[String, State]() | |
override def onExecutorAdded(ex: SparkListenerExecutorAdded): Unit = | |
this.synchronized { | |
executors(ex.executorId) = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM openjdk:8 AS build | |
# Env variables | |
ENV SCALA_VERSION 2.11.12 | |
ENV SBT_VERSION 1.2.8 | |
# Install Scala | |
## Piping curl directly in tar | |
RUN \ | |
curl -fsL https://downloads.typesafe.com/scala/$SCALA_VERSION/scala-$SCALA_VERSION.tgz | tar xfz - -C /root/ && \ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def run_spark_app(config_name, dag): | |
eks_namespace = config.dim_value(config_name, 'eks_namespace') | |
eks_in_cluster = bool(config.dim_value(config_name, 'eks_in_cluster')) | |
eks_service_account_name = config.dim_value(config_name, 'eks_service_account_name') | |
eks_context = None if eks_in_cluster else config.dim_value(config_name, 'eks_context') | |
eks_commands = format_spark_submit(config_name, config.dim_value(config_name, 'eks_commands')) | |
return KubernetesPodOperator( | |
dag=dag, | |
cmds=eks_commands, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: v1 | |
kind: ServiceAccount | |
metadata: | |
name: spark | |
--- | |
apiVersion: rbac.authorization.k8s.io/v1 | |
kind: ClusterRoleBinding | |
metadata: | |
name: spark-role | |
roleRef: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: eksctl.io/v1alpha5 | |
kind: ClusterConfig | |
metadata: | |
name: spark-cluster | |
region: us-east-1 | |
version: "1.14" | |
availabilityZones: ["us-east-1a", "us-east-1b","us-east-1c"] |
NewerOlder