Skip to content

Instantly share code, notes, and snippets.

@kotlovs
Created September 22, 2024 22:15
Show Gist options
  • Save kotlovs/437809684d7ebe3b7a93a1af804def8c to your computer and use it in GitHub Desktop.
Save kotlovs/437809684d7ebe3b7a93a1af804def8c to your computer and use it in GitHub Desktop.
// implementation group: 'io.kubernetes', name: 'client-java'
import io.kubernetes.client.openapi.ApiResponse
import io.kubernetes.client.openapi.apis.CoreV1Api
import io.kubernetes.client.util.ClientBuilder
import org.apache.http.HttpStatus
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import java.sql.Timestamp
import java.time.{Duration, Instant, LocalDateTime, ZoneOffset}
import scala.annotation.tailrec
import scala.collection.JavaConverters._
object SparkConnectMonitoring {
implicit val formats: DefaultFormats.type = DefaultFormats
private val baseScApiUrl = "https://spark-connect.joom.ai/api/v1"
private val k8sNamespace = "spark-connect-service"
private val scDriverPrefix = "spark-connect-service-"
private val latestCompletionTimeIntervalMins = 4
private val restartTimeoutHours = Duration.ofHours(3)
def run(): Unit = {
val appId = getCurrentScAppId
val runningJobs = getJobs("running", appId)
if (runningJobs.exists(_.submissionTime.isBefore(now().minusHours(3)))) {
SendAlert("SparkConnectMonitoring", s"There are long running jobs in SC. " +
s"Earliest submissionTime - ${runningJobs.minBy(t => Timestamp.valueOf(t.submissionTime).getTime).submissionTime}")
} else {
tryToRestart(appId, startTime = now(), timeout = restartTimeoutHours, restartAnyway = false)
}
}
@tailrec
private def tryToRestart(appId: String, startTime: LocalDateTime, timeout: Duration, restartAnyway: Boolean): Unit = {
val runningJobs = getJobs("running", appId)
val completedJobs = getJobs("succeeded", appId) ++ getJobs("failed", appId)
val latestCompletionTime = completedJobs.maxBy(t => Timestamp.valueOf(t.completionTime.get).getTime)
.completionTime.get
if (runningJobs.isEmpty && latestCompletionTime.isBefore(now().minusMinutes(latestCompletionTimeIntervalMins))) {
restartSparkConnect()
} else if (now().isAfter(startTime.plus(timeout))) {
if (restartAnyway)
restartSparkConnect()
else
SendAlert("SparkConnectMonitoring", s"There was no suitable opportunity to restart SC in $timeout")
} else {
// Still waiting for the appropriate moment to restart SC.
Thread.sleep(latestCompletionTimeIntervalMins * 60 * 1000)
tryToRestart(appId, startTime, timeout, restartAnyway)
}
}
private def getJobs(status: String, appId: String): Seq[Job] = {
val response = parse(HttpUtils.httpGet(baseScApiUrl + s"/applications/$appId/jobs?status=$status"))
response.children.map { job =>
val submissionTime = parseTs((job \ "submissionTime").extract[String])
val completionTime = status match {
case "running" => None
case "succeeded" | "failed" => Some(parseTs((job \ "completionTime").extract[String]))
}
Job(status, submissionTime, completionTime)
}
}
private def getCurrentScAppId = {
val response = parse(HttpUtils.httpGet(baseScApiUrl + "/applications"))
val apps = response.children
assert(apps.size == 1)
(apps.head \ "id").extract[String]
}
private def parseTs(ts: String): LocalDateTime = {
LocalDateTime.ofInstant(Instant.parse(ts.replace("GMT", "Z")), ZoneOffset.UTC)
}
private def now(): LocalDateTime = {
LocalDateTime.now(ZoneOffset.UTC)
}
private def restartSparkConnect(): Unit = {
SendAlert("SparkConnectMonitoring", s"Restart SparkConnect")
val k8sApi = new CoreV1Api(ClientBuilder.standard().build())
val driverPod = getResult(k8sApi.listNamespacedPod(k8sNamespace).executeWithHttpInfo()).getItems.asScala
.filter(pod => pod.getMetadata.getName.startsWith(scDriverPrefix) && pod.getStatus.getPhase == "Running")
assert(driverPod.size == 1, s"Can't find the SC driver pod.")
val driverPodName = driverPod.head.getMetadata.getName
// Try to kill the SC driver pod.
getResult(k8sApi.deleteNamespacedPod(driverPodName, k8sNamespace)
.gracePeriodSeconds(300)
.executeWithHttpInfo())
}
private def getResult[T](result: ApiResponse[T]): T = {
if (result.getStatusCode != HttpStatus.SC_OK)
throw new Exception(s"K8S api request returns a bad status code: ${result.getStatusCode}")
result.getData
}
}
case class Job(
status: String,
submissionTime: LocalDateTime,
completionTime: Option[LocalDateTime],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment