Skip to content

Instantly share code, notes, and snippets.

@dimastatz
Created November 12, 2020 19:12
Show Gist options
  • Save dimastatz/02180157944367b4fac7819a19512505 to your computer and use it in GitHub Desktop.
Save dimastatz/02180157944367b4fac7819a19512505 to your computer and use it in GitHub Desktop.
class SparkListenerClient extends SparkListener {
import System.currentTimeMillis
case class State(id: String, start: Long, duration: Long, cores: Int, closed: Boolean)
val executors: mutable.Map[String, State] = mutable.Map[String, State]()
override def onExecutorAdded(ex: SparkListenerExecutorAdded): Unit =
this.synchronized {
executors(ex.executorId) =
State(ex.executorId, currentTimeMillis(), 0, ex.executorInfo.totalCores, closed = false)
}
override def onExecutorRemoved(ex: SparkListenerExecutorRemoved): Unit =
this.synchronized {
val state = executors(ex.executorId)
executors(ex.executorId) =
State(state.id, state.start, currentTimeMillis() - state.start, state.cores, closed = true)
}
override def onExecutorBlacklisted(ex: SparkListenerExecutorBlacklisted): Unit =
this.synchronized {
val state = executors(ex.executorId)
executors(ex.executorId) =
State(state.id, state.start, currentTimeMillis() - state.start, state.cores, closed = true)
}
override def onExecutorUnblacklisted(ex: SparkListenerExecutorUnblacklisted): Unit =
this.synchronized {
val state = executors(ex.executorId)
executors(ex.executorId) = State(state.id, state.start, state.duration, state.cores, closed = false)
}
override def onApplicationEnd(app: SparkListenerApplicationEnd): Unit = {
super.onApplicationEnd(app)
closeExecutors()
}
def closeExecutors(): Unit = {
executors.keys
.filter(k => !executors(k).closed)
.foreach(k => {
val state = executors(k)
executors(k) =
State(state.id, state.start, System.currentTimeMillis() - state.start, state.cores, closed = true)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment