Created
November 12, 2020 19:12
-
-
Save dimastatz/02180157944367b4fac7819a19512505 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SparkListenerClient extends SparkListener { | |
import System.currentTimeMillis | |
case class State(id: String, start: Long, duration: Long, cores: Int, closed: Boolean) | |
val executors: mutable.Map[String, State] = mutable.Map[String, State]() | |
override def onExecutorAdded(ex: SparkListenerExecutorAdded): Unit = | |
this.synchronized { | |
executors(ex.executorId) = | |
State(ex.executorId, currentTimeMillis(), 0, ex.executorInfo.totalCores, closed = false) | |
} | |
override def onExecutorRemoved(ex: SparkListenerExecutorRemoved): Unit = | |
this.synchronized { | |
val state = executors(ex.executorId) | |
executors(ex.executorId) = | |
State(state.id, state.start, currentTimeMillis() - state.start, state.cores, closed = true) | |
} | |
override def onExecutorBlacklisted(ex: SparkListenerExecutorBlacklisted): Unit = | |
this.synchronized { | |
val state = executors(ex.executorId) | |
executors(ex.executorId) = | |
State(state.id, state.start, currentTimeMillis() - state.start, state.cores, closed = true) | |
} | |
override def onExecutorUnblacklisted(ex: SparkListenerExecutorUnblacklisted): Unit = | |
this.synchronized { | |
val state = executors(ex.executorId) | |
executors(ex.executorId) = State(state.id, state.start, state.duration, state.cores, closed = false) | |
} | |
override def onApplicationEnd(app: SparkListenerApplicationEnd): Unit = { | |
super.onApplicationEnd(app) | |
closeExecutors() | |
} | |
def closeExecutors(): Unit = { | |
executors.keys | |
.filter(k => !executors(k).closed) | |
.foreach(k => { | |
val state = executors(k) | |
executors(k) = | |
State(state.id, state.start, System.currentTimeMillis() - state.start, state.cores, closed = true) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment